29
29
from bzrlib.osutils import split_lines
30
30
from bzrlib.tests import TestCaseWithTransport
31
from bzrlib.transport import TransportLogger, get_transport
31
from bzrlib.transport import TransportLogger
32
from bzrlib.transport.local import LocalTransport
32
33
from bzrlib.transport.memory import MemoryTransport
33
34
from bzrlib.weave import Weave
41
42
factory = KnitPlainFactory()
44
return KnitVersionedFile('test', get_transport('.'), access_mode='w', factory=factory, create=True)
45
return KnitVersionedFile('test', LocalTransport('.'), access_mode='w', factory=factory, create=True)
47
48
class BasicKnitTests(KnitTests):
66
67
k = self.make_test_knit()
67
68
k.add_lines('text-1', [], split_lines(TEXT_1))
69
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
70
k2 = KnitVersionedFile('test', LocalTransport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
70
71
self.assertTrue(k2.has_version('text-1'))
71
72
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
94
95
def test_incomplete(self):
95
96
"""Test if texts without a ending line-end can be inserted and
97
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
98
k = KnitVersionedFile('test', LocalTransport('.'), delta=False, create=True)
98
99
k.add_lines('text-1', [], ['a\n', 'b' ])
99
100
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
100
101
# reopening ensures maximum room for confusion
101
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
102
k = KnitVersionedFile('test', LocalTransport('.'), delta=False, create=True)
102
103
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
103
104
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
127
128
def test_add_delta(self):
128
129
"""Store in knit with parents"""
129
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
130
k = KnitVersionedFile('test', LocalTransport('.'), factory=KnitPlainFactory(),
130
131
delta=True, create=True)
131
132
self.add_stock_one_and_one_a(k)
150
151
def test_annotate_fulltext(self):
151
152
"""Annotations"""
152
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
153
k = KnitVersionedFile('knit', LocalTransport('.'), factory=KnitAnnotateFactory(),
153
154
delta=False, create=True)
154
155
self.insert_and_test_small_annotate(k)
229
230
def test_knit_join(self):
230
231
"""Store in knit with parents"""
231
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
232
k1 = KnitVersionedFile('test1', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
232
233
k1.add_lines('text-a', [], split_lines(TEXT_1))
233
234
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
238
239
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
240
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
241
k2 = KnitVersionedFile('test2', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
241
242
count = k2.join(k1, version_ids=['text-m'])
242
243
self.assertEquals(count, 5)
243
244
self.assertTrue(k2.has_version('text-a'))
244
245
self.assertTrue(k2.has_version('text-c'))
246
247
def test_reannotate(self):
247
k1 = KnitVersionedFile('knit1', get_transport('.'),
248
k1 = KnitVersionedFile('knit1', LocalTransport('.'),
248
249
factory=KnitAnnotateFactory(), create=True)
250
251
k1.add_lines('text-a', [], ['a\n', 'b\n'])
252
253
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
254
k2 = KnitVersionedFile('test2', get_transport('.'),
255
k2 = KnitVersionedFile('test2', LocalTransport('.'),
255
256
factory=KnitAnnotateFactory(), create=True)
256
257
k2.join(k1, version_ids=['text-b'])
272
273
self.assertEquals(origins[0], ('text-c', 'z\n'))
273
274
self.assertEquals(origins[1], ('text-b', 'c\n'))
275
def test_get_line_delta_texts(self):
276
"""Make sure we can call get_texts on text with reused line deltas"""
277
k1 = KnitVersionedFile('test1', get_transport('.'),
278
factory=KnitPlainFactory(), create=True)
283
parents = ['%d' % (t-1)]
284
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
285
k1.get_texts(('%d' % t) for t in range(3))
276
def test_extraction_reads_components_once(self):
277
t = MemoryTransport()
278
instrumented_t = TransportLogger(t)
279
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
280
# should read the index
281
self.assertEqual([('id.kndx',)], instrumented_t._calls)
282
instrumented_t._calls = []
284
k1.add_lines('base', [], ['text\n'])
285
# should not have read at all
286
self.assertEqual([], instrumented_t._calls)
289
k1.add_lines('sub', ['base'], ['text\n', 'text2\n'])
290
# should not have read at all
291
self.assertEqual([], instrumented_t._calls)
295
# should not have read at all
296
self.assertEqual([], instrumented_t._calls)
303
# should have read a component
304
# should not have read the first component only
305
self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
306
instrumented_t._calls = []
309
# should not have read at all
310
self.assertEqual([], instrumented_t._calls)
311
# and now read the other component
313
# should have read the second component
314
self.assertEqual([('id.knit', [(87, 93)])], instrumented_t._calls)
315
instrumented_t._calls = []
320
k1.add_lines('sub2', ['base'], ['text\n', 'text3\n'])
321
# should read the first component only
322
self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
287
324
def test_iter_lines_reads_in_order(self):
288
325
t = MemoryTransport()
326
363
"\nrevid2 line-delta 84 82 0 :",
328
365
# we should be able to load this file again
329
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
366
knit = KnitVersionedFile('test', LocalTransport('.'), access_mode='r')
330
367
self.assertEqual(['revid', 'revid2'], knit.versions())
331
368
# write a short write to the file and ensure that its ignored
332
369
indexfile = file('test.kndx', 'at')
333
370
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
334
371
indexfile.close()
335
372
# we should be able to load this file again
336
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
373
knit = KnitVersionedFile('test', LocalTransport('.'), access_mode='w')
337
374
self.assertEqual(['revid', 'revid2'], knit.versions())
338
375
# and add a revision with the same id the failed write had
339
376
knit.add_lines('revid3', ['revid2'], ['a\n'])
340
377
# and when reading it revid3 should now appear.
341
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
378
knit = KnitVersionedFile('test', LocalTransport('.'), access_mode='r')
342
379
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
343
380
self.assertEqual(['revid2'], knit.get_parents('revid3'))