~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: Martin Pool
  • Date: 2006-03-10 06:29:53 UTC
  • mfrom: (1608 +trunk)
  • mto: This revision was merged to the branch mainline in revision 1611.
  • Revision ID: mbp@sourcefrog.net-20060310062953-bc1c7ade75c89a7a
[merge] bzr.dev; pycurl not updated for readv yet

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 by Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for Knit data structure"""
 
18
 
 
19
 
 
20
import difflib
 
21
 
 
22
 
 
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent
 
24
from bzrlib.knit import KnitVersionedFile, KnitPlainFactory, KnitAnnotateFactory
 
25
from bzrlib.osutils import split_lines
 
26
from bzrlib.tests import TestCaseInTempDir
 
27
from bzrlib.transport import TransportLogger
 
28
from bzrlib.transport.local import LocalTransport
 
29
from bzrlib.transport.memory import MemoryTransport
 
30
 
 
31
 
 
32
class KnitTests(TestCaseInTempDir):
 
33
 
 
34
    def add_stock_one_and_one_a(self, k):
 
35
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
36
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
 
37
 
 
38
    def test_knit_constructor(self):
 
39
        """Construct empty k"""
 
40
        self.make_test_knit()
 
41
 
 
42
    def make_test_knit(self, annotate=False):
 
43
        if not annotate:
 
44
            factory = KnitPlainFactory()
 
45
        else:
 
46
            factory = None
 
47
        return KnitVersionedFile('test', LocalTransport('.'), access_mode='w', factory=factory, create=True)
 
48
 
 
49
    def test_knit_add(self):
 
50
        """Store one text in knit and retrieve"""
 
51
        k = self.make_test_knit()
 
52
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
53
        self.assertTrue(k.has_version('text-1'))
 
54
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
55
 
 
56
    def test_knit_reload(self):
 
57
        """Store and reload a knit"""
 
58
        k = self.make_test_knit()
 
59
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
60
        del k
 
61
        k2 = KnitVersionedFile('test', LocalTransport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
 
62
        self.assertTrue(k2.has_version('text-1'))
 
63
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
 
64
 
 
65
    def test_knit_several(self):
 
66
        """Store several texts in a knit"""
 
67
        k = self.make_test_knit()
 
68
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
69
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
70
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
71
        self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
 
72
        
 
73
    def test_repeated_add(self):
 
74
        """Knit traps attempt to replace existing version"""
 
75
        k = self.make_test_knit()
 
76
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
77
        self.assertRaises(RevisionAlreadyPresent, 
 
78
                k.add_lines,
 
79
                'text-1', [], split_lines(TEXT_1))
 
80
 
 
81
    def test_empty(self):
 
82
        k = self.make_test_knit(True)
 
83
        k.add_lines('text-1', [], [])
 
84
        self.assertEquals(k.get_lines('text-1'), [])
 
85
 
 
86
    def test_incomplete(self):
 
87
        """Test if texts without a ending line-end can be inserted and
 
88
        extracted."""
 
89
        k = KnitVersionedFile('test', LocalTransport('.'), delta=False, create=True)
 
90
        k.add_lines('text-1', [], ['a\n',    'b'  ])
 
91
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
 
92
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
 
93
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
 
94
 
 
95
    def test_delta(self):
 
96
        """Expression of knit delta as lines"""
 
97
        k = self.make_test_knit()
 
98
        td = list(line_delta(TEXT_1.splitlines(True),
 
99
                             TEXT_1A.splitlines(True)))
 
100
        self.assertEqualDiff(''.join(td), delta_1_1a)
 
101
        out = apply_line_delta(TEXT_1.splitlines(True), td)
 
102
        self.assertEqualDiff(''.join(out), TEXT_1A)
 
103
 
 
104
    def test_add_with_parents(self):
 
105
        """Store in knit with parents"""
 
106
        k = self.make_test_knit()
 
107
        self.add_stock_one_and_one_a(k)
 
108
        self.assertEquals(k.get_parents('text-1'), [])
 
109
        self.assertEquals(k.get_parents('text-1a'), ['text-1'])
 
110
 
 
111
    def test_ancestry(self):
 
112
        """Store in knit with parents"""
 
113
        k = self.make_test_knit()
 
114
        self.add_stock_one_and_one_a(k)
 
115
        self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
 
116
 
 
117
    def test_add_delta(self):
 
118
        """Store in knit with parents"""
 
119
        k = KnitVersionedFile('test', LocalTransport('.'), factory=KnitPlainFactory(),
 
120
            delta=True, create=True)
 
121
        self.add_stock_one_and_one_a(k)
 
122
        k.clear_cache()
 
123
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
 
124
 
 
125
    def test_annotate(self):
 
126
        """Annotations"""
 
127
        k = KnitVersionedFile('knit', LocalTransport('.'), factory=KnitAnnotateFactory(),
 
128
            delta=True, create=True)
 
129
        self.insert_and_test_small_annotate(k)
 
130
 
 
131
    def insert_and_test_small_annotate(self, k):
 
132
        """test annotation with k works correctly."""
 
133
        k.add_lines('text-1', [], ['a\n', 'b\n'])
 
134
        k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
 
135
 
 
136
        origins = k.annotate('text-2')
 
137
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
138
        self.assertEquals(origins[1], ('text-2', 'c\n'))
 
139
 
 
140
    def test_annotate_fulltext(self):
 
141
        """Annotations"""
 
142
        k = KnitVersionedFile('knit', LocalTransport('.'), factory=KnitAnnotateFactory(),
 
143
            delta=False, create=True)
 
144
        self.insert_and_test_small_annotate(k)
 
145
 
 
146
    def test_annotate_merge_1(self):
 
147
        k = self.make_test_knit(True)
 
148
        k.add_lines('text-a1', [], ['a\n', 'b\n'])
 
149
        k.add_lines('text-a2', [], ['d\n', 'c\n'])
 
150
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
 
151
        origins = k.annotate('text-am')
 
152
        self.assertEquals(origins[0], ('text-a2', 'd\n'))
 
153
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
154
 
 
155
    def test_annotate_merge_2(self):
 
156
        k = self.make_test_knit(True)
 
157
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
158
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
159
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
 
160
        origins = k.annotate('text-am')
 
161
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
162
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
163
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
164
 
 
165
    def test_annotate_merge_9(self):
 
166
        k = self.make_test_knit(True)
 
167
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
168
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
169
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
 
170
        origins = k.annotate('text-am')
 
171
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
172
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
173
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
174
 
 
175
    def test_annotate_merge_3(self):
 
176
        k = self.make_test_knit(True)
 
177
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
178
        k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
 
179
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
 
180
        origins = k.annotate('text-am')
 
181
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
182
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
183
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
184
 
 
185
    def test_annotate_merge_4(self):
 
186
        k = self.make_test_knit(True)
 
187
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
188
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
189
        k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
 
190
        k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
 
191
        origins = k.annotate('text-am')
 
192
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
193
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
194
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
195
 
 
196
    def test_annotate_merge_5(self):
 
197
        k = self.make_test_knit(True)
 
198
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
199
        k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
 
200
        k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
 
201
        k.add_lines('text-am',
 
202
                    ['text-a1', 'text-a2', 'text-a3'],
 
203
                    ['a\n', 'e\n', 'z\n'])
 
204
        origins = k.annotate('text-am')
 
205
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
206
        self.assertEquals(origins[1], ('text-a2', 'e\n'))
 
207
        self.assertEquals(origins[2], ('text-a3', 'z\n'))
 
208
 
 
209
    def test_annotate_file_cherry_pick(self):
 
210
        k = self.make_test_knit(True)
 
211
        k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
 
212
        k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
 
213
        k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
 
214
        origins = k.annotate('text-3')
 
215
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
216
        self.assertEquals(origins[1], ('text-1', 'b\n'))
 
217
        self.assertEquals(origins[2], ('text-1', 'c\n'))
 
218
 
 
219
    def test_knit_join(self):
 
220
        """Store in knit with parents"""
 
221
        k1 = KnitVersionedFile('test1', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
 
222
        k1.add_lines('text-a', [], split_lines(TEXT_1))
 
223
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
 
224
 
 
225
        k1.add_lines('text-c', [], split_lines(TEXT_1))
 
226
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
 
227
 
 
228
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
 
229
 
 
230
        k2 = KnitVersionedFile('test2', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
 
231
        count = k2.join(k1, version_ids=['text-m'])
 
232
        self.assertEquals(count, 5)
 
233
        self.assertTrue(k2.has_version('text-a'))
 
234
        self.assertTrue(k2.has_version('text-c'))
 
235
 
 
236
    def test_reannotate(self):
 
237
        k1 = KnitVersionedFile('knit1', LocalTransport('.'),
 
238
                               factory=KnitAnnotateFactory(), create=True)
 
239
        # 0
 
240
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
241
        # 1
 
242
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
 
243
 
 
244
        k2 = KnitVersionedFile('test2', LocalTransport('.'),
 
245
                               factory=KnitAnnotateFactory(), create=True)
 
246
        k2.join(k1, version_ids=['text-b'])
 
247
 
 
248
        # 2
 
249
        k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
 
250
        # 2
 
251
        k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
 
252
        # 3
 
253
        k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
 
254
 
 
255
        # test-c will have index 3
 
256
        k1.join(k2, version_ids=['text-c'])
 
257
 
 
258
        lines = k1.get_lines('text-c')
 
259
        self.assertEquals(lines, ['z\n', 'c\n'])
 
260
 
 
261
        origins = k1.annotate('text-c')
 
262
        self.assertEquals(origins[0], ('text-c', 'z\n'))
 
263
        self.assertEquals(origins[1], ('text-b', 'c\n'))
 
264
 
 
265
    def test_extraction_reads_components_once(self):
 
266
        t = MemoryTransport()
 
267
        instrumented_t = TransportLogger(t)
 
268
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
269
        # should read the index
 
270
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
271
        instrumented_t._calls = []
 
272
        # add a text       
 
273
        k1.add_lines('base', [], ['text\n'])
 
274
        # should not have read at all
 
275
        self.assertEqual([], instrumented_t._calls)
 
276
 
 
277
        # add a text
 
278
        k1.add_lines('sub', ['base'], ['text\n', 'text2\n'])
 
279
        # should not have read at all
 
280
        self.assertEqual([], instrumented_t._calls)
 
281
        
 
282
        # read a text
 
283
        k1.get_lines('sub')
 
284
        # should not have read at all
 
285
        self.assertEqual([], instrumented_t._calls)
 
286
 
 
287
        # clear the cache
 
288
        k1.clear_cache()
 
289
 
 
290
        # read a text
 
291
        k1.get_lines('base')
 
292
        # should have read a component
 
293
        # should not have read the first component only
 
294
        self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
 
295
        instrumented_t._calls = []
 
296
        # read again
 
297
        k1.get_lines('base')
 
298
        # should not have read at all
 
299
        self.assertEqual([], instrumented_t._calls)
 
300
        # and now read the other component
 
301
        k1.get_lines('sub')
 
302
        # should have read the second component
 
303
        self.assertEqual([('id.knit', [(87, 93)])], instrumented_t._calls)
 
304
        instrumented_t._calls = []
 
305
 
 
306
        # clear the cache
 
307
        k1.clear_cache()
 
308
        # add a text cold 
 
309
        k1.add_lines('sub2', ['base'], ['text\n', 'text3\n'])
 
310
        # should read the first component only
 
311
        self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
 
312
        
 
313
    def test_iter_lines_reads_in_order(self):
 
314
        t = MemoryTransport()
 
315
        instrumented_t = TransportLogger(t)
 
316
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
317
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
318
        # add texts with no required ordering
 
319
        k1.add_lines('base', [], ['text\n'])
 
320
        k1.add_lines('base2', [], ['text2\n'])
 
321
        k1.clear_cache()
 
322
        instrumented_t._calls = []
 
323
        # request a last-first iteration
 
324
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
 
325
        self.assertEqual([('id.knit', [(0, 87), (87, 90)])], instrumented_t._calls)
 
326
        self.assertEqual(['text\n', 'text2\n'], results)
 
327
 
 
328
    def test_create_empty_annotated(self):
 
329
        k1 = self.make_test_knit(True)
 
330
        # 0
 
331
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
332
        k2 = k1.create_empty('t', MemoryTransport())
 
333
        self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
 
334
        self.assertEqual(k1.delta, k2.delta)
 
335
        # the generic test checks for empty content and file class
 
336
 
 
337
 
 
338
TEXT_1 = """\
 
339
Banana cup cakes:
 
340
 
 
341
- bananas
 
342
- eggs
 
343
- broken tea cups
 
344
"""
 
345
 
 
346
TEXT_1A = """\
 
347
Banana cup cake recipe
 
348
(serves 6)
 
349
 
 
350
- bananas
 
351
- eggs
 
352
- broken tea cups
 
353
- self-raising flour
 
354
"""
 
355
 
 
356
delta_1_1a = """\
 
357
0,1,2
 
358
Banana cup cake recipe
 
359
(serves 6)
 
360
5,5,1
 
361
- self-raising flour
 
362
"""
 
363
 
 
364
TEXT_2 = """\
 
365
Boeuf bourguignon
 
366
 
 
367
- beef
 
368
- red wine
 
369
- small onions
 
370
- carrot
 
371
- mushrooms
 
372
"""
 
373
 
 
374
def line_delta(from_lines, to_lines):
 
375
    """Generate line-based delta from one text to another"""
 
376
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
 
377
    for op in s.get_opcodes():
 
378
        if op[0] == 'equal':
 
379
            continue
 
380
        yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
 
381
        for i in range(op[3], op[4]):
 
382
            yield to_lines[i]
 
383
 
 
384
 
 
385
def apply_line_delta(basis_lines, delta_lines):
 
386
    """Apply a line-based perfect diff
 
387
    
 
388
    basis_lines -- text to apply the patch to
 
389
    delta_lines -- diff instructions and content
 
390
    """
 
391
    out = basis_lines[:]
 
392
    i = 0
 
393
    offset = 0
 
394
    while i < len(delta_lines):
 
395
        l = delta_lines[i]
 
396
        a, b, c = map(long, l.split(','))
 
397
        i = i + 1
 
398
        out[offset+a:offset+b] = delta_lines[i:i+c]
 
399
        i = i + c
 
400
        offset = offset + (b - a) + c
 
401
    return out