1
# Copyright (C) 2005 by Canonical Ltd
4
# Johan Rydberg <jrydberg@gnu.org>
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
# GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22
import bzrlib.errors as errors
23
from bzrlib.errors import (
25
RevisionAlreadyPresent,
28
from bzrlib.knit import KnitVersionedFile, \
30
from bzrlib.tests import TestCaseWithTransport
31
from bzrlib.trace import mutter
32
from bzrlib.transport import get_transport
33
from bzrlib.transport.memory import MemoryTransport
34
import bzrlib.versionedfile as versionedfile
35
from bzrlib.weave import WeaveFile
36
from bzrlib.weavefile import read_weave
39
class VersionedFileTestMixIn(object):
40
"""A mixin test class for testing VersionedFiles.
42
This is not an adaptor-style test at this point because
43
theres no dynamic substitution of versioned file implementations,
44
they are strictly controlled by their owning repositories.
49
f.add_lines('r0', [], ['a\n', 'b\n'])
50
f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
52
versions = f.versions()
53
self.assertTrue('r0' in versions)
54
self.assertTrue('r1' in versions)
55
self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
56
self.assertEquals(f.get_text('r0'), 'a\nb\n')
57
self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
58
self.assertEqual(2, len(f))
59
self.assertEqual(2, f.num_versions())
61
self.assertRaises(RevisionNotPresent,
62
f.add_lines, 'r2', ['foo'], [])
63
self.assertRaises(RevisionAlreadyPresent,
64
f.add_lines, 'r1', [], [])
66
f = self.reopen_file()
69
def test_adds_with_parent_texts(self):
72
parent_texts['r0'] = f.add_lines('r0', [], ['a\n', 'b\n'])
74
parent_texts['r1'] = f.add_lines_with_ghosts('r1',
77
parent_texts=parent_texts)
78
except NotImplementedError:
79
# if the format doesn't support ghosts, just add normally.
80
parent_texts['r1'] = f.add_lines('r1',
83
parent_texts=parent_texts)
84
f.add_lines('r2', ['r1'], ['c\n', 'd\n'], parent_texts=parent_texts)
85
self.assertNotEqual(None, parent_texts['r0'])
86
self.assertNotEqual(None, parent_texts['r1'])
88
versions = f.versions()
89
self.assertTrue('r0' in versions)
90
self.assertTrue('r1' in versions)
91
self.assertTrue('r2' in versions)
92
self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
93
self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
94
self.assertEquals(f.get_lines('r2'), ['c\n', 'd\n'])
95
self.assertEqual(3, f.num_versions())
96
origins = f.annotate('r1')
97
self.assertEquals(origins[0][0], 'r0')
98
self.assertEquals(origins[1][0], 'r1')
99
origins = f.annotate('r2')
100
self.assertEquals(origins[0][0], 'r1')
101
self.assertEquals(origins[1][0], 'r2')
104
f = self.reopen_file()
107
def test_get_delta(self):
109
sha1s = self._setup_for_deltas(f)
110
expected_delta = (None, '6bfa09d82ce3e898ad4641ae13dd4fdb9cf0d76b', False,
111
[(0, 0, 1, [('base', 'line\n')])])
112
self.assertEqual(expected_delta, f.get_delta('base'))
114
text_name = 'chain1-'
115
for depth in range(26):
116
new_version = text_name + '%s' % depth
117
expected_delta = (next_parent, sha1s[depth],
119
[(depth + 1, depth + 1, 1, [(new_version, 'line\n')])])
120
self.assertEqual(expected_delta, f.get_delta(new_version))
121
next_parent = new_version
123
text_name = 'chain2-'
124
for depth in range(26):
125
new_version = text_name + '%s' % depth
126
expected_delta = (next_parent, sha1s[depth], False,
127
[(depth + 1, depth + 1, 1, [(new_version, 'line\n')])])
128
self.assertEqual(expected_delta, f.get_delta(new_version))
129
next_parent = new_version
130
# smoke test for eol support
131
expected_delta = ('base', '264f39cab871e4cfd65b3a002f7255888bb5ed97', True, [])
132
self.assertEqual(['line'], f.get_lines('noeol'))
133
self.assertEqual(expected_delta, f.get_delta('noeol'))
135
def test_get_deltas(self):
137
sha1s = self._setup_for_deltas(f)
138
deltas = f.get_deltas(f.versions())
139
expected_delta = (None, '6bfa09d82ce3e898ad4641ae13dd4fdb9cf0d76b', False,
140
[(0, 0, 1, [('base', 'line\n')])])
141
self.assertEqual(expected_delta, deltas['base'])
143
text_name = 'chain1-'
144
for depth in range(26):
145
new_version = text_name + '%s' % depth
146
expected_delta = (next_parent, sha1s[depth],
148
[(depth + 1, depth + 1, 1, [(new_version, 'line\n')])])
149
self.assertEqual(expected_delta, deltas[new_version])
150
next_parent = new_version
152
text_name = 'chain2-'
153
for depth in range(26):
154
new_version = text_name + '%s' % depth
155
expected_delta = (next_parent, sha1s[depth], False,
156
[(depth + 1, depth + 1, 1, [(new_version, 'line\n')])])
157
self.assertEqual(expected_delta, deltas[new_version])
158
next_parent = new_version
159
# smoke tests for eol support
160
expected_delta = ('base', '264f39cab871e4cfd65b3a002f7255888bb5ed97', True, [])
161
self.assertEqual(['line'], f.get_lines('noeol'))
162
self.assertEqual(expected_delta, deltas['noeol'])
163
# smoke tests for eol support - two noeol in a row same content
164
expected_deltas = (('noeol', '3ad7ee82dbd8f29ecba073f96e43e414b3f70a4d', True,
165
[(0, 1, 2, [(u'noeolsecond', 'line\n'), (u'noeolsecond', 'line\n')])]),
166
('noeol', '3ad7ee82dbd8f29ecba073f96e43e414b3f70a4d', True,
167
[(0, 0, 1, [('noeolsecond', 'line\n')]), (1, 1, 0, [])]))
168
self.assertEqual(['line\n', 'line'], f.get_lines('noeolsecond'))
169
self.assertTrue(deltas['noeolsecond'] in expected_deltas)
170
# two no-eol in a row, different content
171
expected_delta = ('noeolsecond', '8bb553a84e019ef1149db082d65f3133b195223b', True,
172
[(1, 2, 1, [(u'noeolnotshared', 'phone\n')])])
173
self.assertEqual(['line\n', 'phone'], f.get_lines('noeolnotshared'))
174
self.assertEqual(expected_delta, deltas['noeolnotshared'])
175
# eol folling a no-eol with content change
176
expected_delta = ('noeol', 'a61f6fb6cfc4596e8d88c34a308d1e724caf8977', False,
177
[(0, 1, 1, [(u'eol', 'phone\n')])])
178
self.assertEqual(['phone\n'], f.get_lines('eol'))
179
self.assertEqual(expected_delta, deltas['eol'])
180
# eol folling a no-eol with content change
181
expected_delta = ('noeol', '6bfa09d82ce3e898ad4641ae13dd4fdb9cf0d76b', False,
182
[(0, 1, 1, [(u'eolline', 'line\n')])])
183
self.assertEqual(['line\n'], f.get_lines('eolline'))
184
self.assertEqual(expected_delta, deltas['eolline'])
185
# eol with no parents
186
expected_delta = (None, '264f39cab871e4cfd65b3a002f7255888bb5ed97', True,
187
[(0, 0, 1, [(u'noeolbase', 'line\n')])])
188
self.assertEqual(['line'], f.get_lines('noeolbase'))
189
self.assertEqual(expected_delta, deltas['noeolbase'])
190
# eol with two parents, in inverse insertion order
191
expected_deltas = (('noeolbase', '264f39cab871e4cfd65b3a002f7255888bb5ed97', True,
192
[(0, 1, 1, [(u'eolbeforefirstparent', 'line\n')])]),
193
('noeolbase', '264f39cab871e4cfd65b3a002f7255888bb5ed97', True,
194
[(0, 1, 1, [(u'eolbeforefirstparent', 'line\n')])]))
195
self.assertEqual(['line'], f.get_lines('eolbeforefirstparent'))
196
#self.assertTrue(deltas['eolbeforefirstparent'] in expected_deltas)
198
def _setup_for_deltas(self, f):
199
self.assertRaises(errors.RevisionNotPresent, f.get_delta, 'base')
200
# add texts that should trip the knit maximum delta chain threshold
201
# as well as doing parallel chains of data in knits.
202
# this is done by two chains of 25 insertions
203
f.add_lines('base', [], ['line\n'])
204
f.add_lines('noeol', ['base'], ['line'])
205
# detailed eol tests:
206
# shared last line with parent no-eol
207
f.add_lines('noeolsecond', ['noeol'], ['line\n', 'line'])
208
# differing last line with parent, both no-eol
209
f.add_lines('noeolnotshared', ['noeolsecond'], ['line\n', 'phone'])
210
# add eol following a noneol parent, change content
211
f.add_lines('eol', ['noeol'], ['phone\n'])
212
# add eol following a noneol parent, no change content
213
f.add_lines('eolline', ['noeol'], ['line\n'])
214
# noeol with no parents:
215
f.add_lines('noeolbase', [], ['line'])
216
# noeol preceeding its leftmost parent in the output:
217
# this is done by making it a merge of two parents with no common
218
# anestry: noeolbase and noeol with the
219
# later-inserted parent the leftmost.
220
f.add_lines('eolbeforefirstparent', ['noeolbase', 'noeol'], ['line'])
221
# two identical eol texts
222
f.add_lines('noeoldup', ['noeol'], ['line'])
224
text_name = 'chain1-'
226
sha1s = {0 :'da6d3141cb4a5e6f464bf6e0518042ddc7bfd079',
227
1 :'45e21ea146a81ea44a821737acdb4f9791c8abe7',
228
2 :'e1f11570edf3e2a070052366c582837a4fe4e9fa',
229
3 :'26b4b8626da827088c514b8f9bbe4ebf181edda1',
230
4 :'e28a5510be25ba84d31121cff00956f9970ae6f6',
231
5 :'d63ec0ce22e11dcf65a931b69255d3ac747a318d',
232
6 :'2c2888d288cb5e1d98009d822fedfe6019c6a4ea',
233
7 :'95c14da9cafbf828e3e74a6f016d87926ba234ab',
234
8 :'779e9a0b28f9f832528d4b21e17e168c67697272',
235
9 :'1f8ff4e5c6ff78ac106fcfe6b1e8cb8740ff9a8f',
236
10:'131a2ae712cf51ed62f143e3fbac3d4206c25a05',
237
11:'c5a9d6f520d2515e1ec401a8f8a67e6c3c89f199',
238
12:'31a2286267f24d8bedaa43355f8ad7129509ea85',
239
13:'dc2a7fe80e8ec5cae920973973a8ee28b2da5e0a',
240
14:'2c4b1736566b8ca6051e668de68650686a3922f2',
241
15:'5912e4ecd9b0c07be4d013e7e2bdcf9323276cde',
242
16:'b0d2e18d3559a00580f6b49804c23fea500feab3',
243
17:'8e1d43ad72f7562d7cb8f57ee584e20eb1a69fc7',
244
18:'5cf64a3459ae28efa60239e44b20312d25b253f3',
245
19:'1ebed371807ba5935958ad0884595126e8c4e823',
246
20:'2aa62a8b06fb3b3b892a3292a068ade69d5ee0d3',
247
21:'01edc447978004f6e4e962b417a4ae1955b6fe5d',
248
22:'d8d8dc49c4bf0bab401e0298bb5ad827768618bb',
249
23:'c21f62b1c482862983a8ffb2b0c64b3451876e3f',
250
24:'c0593fe795e00dff6b3c0fe857a074364d5f04fc',
251
25:'dd1a1cf2ba9cc225c3aff729953e6364bf1d1855',
253
for depth in range(26):
254
new_version = text_name + '%s' % depth
255
text = text + ['line\n']
256
f.add_lines(new_version, [next_parent], text)
257
next_parent = new_version
259
text_name = 'chain2-'
261
for depth in range(26):
262
new_version = text_name + '%s' % depth
263
text = text + ['line\n']
264
f.add_lines(new_version, [next_parent], text)
265
next_parent = new_version
268
def test_add_delta(self):
269
# tests for the add-delta facility.
270
# at this point, optimising for speed, we assume no checks when deltas are inserted.
271
# this may need to be revisited.
272
source = self.get_file('source')
273
source.add_lines('base', [], ['line\n'])
275
text_name = 'chain1-'
277
for depth in range(26):
278
new_version = text_name + '%s' % depth
279
text = text + ['line\n']
280
source.add_lines(new_version, [next_parent], text)
281
next_parent = new_version
283
text_name = 'chain2-'
285
for depth in range(26):
286
new_version = text_name + '%s' % depth
287
text = text + ['line\n']
288
source.add_lines(new_version, [next_parent], text)
289
next_parent = new_version
290
source.add_lines('noeol', ['base'], ['line'])
292
target = self.get_file('target')
293
for version in source.versions():
294
parent, sha1, noeol, delta = source.get_delta(version)
295
target.add_delta(version,
296
source.get_parents(version),
301
self.assertRaises(RevisionAlreadyPresent,
302
target.add_delta, 'base', [], None, '', False, [])
303
for version in source.versions():
304
self.assertEqual(source.get_lines(version),
305
target.get_lines(version))
307
def test_ancestry(self):
309
self.assertEqual([], f.get_ancestry([]))
310
f.add_lines('r0', [], ['a\n', 'b\n'])
311
f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
312
f.add_lines('r2', ['r0'], ['b\n', 'c\n'])
313
f.add_lines('r3', ['r2'], ['b\n', 'c\n'])
314
f.add_lines('rM', ['r1', 'r2'], ['b\n', 'c\n'])
315
self.assertEqual([], f.get_ancestry([]))
316
versions = f.get_ancestry(['rM'])
317
# there are some possibilities:
321
# so we check indexes
322
r0 = versions.index('r0')
323
r1 = versions.index('r1')
324
r2 = versions.index('r2')
325
self.assertFalse('r3' in versions)
326
rM = versions.index('rM')
327
self.assertTrue(r0 < r1)
328
self.assertTrue(r0 < r2)
329
self.assertTrue(r1 < rM)
330
self.assertTrue(r2 < rM)
332
self.assertRaises(RevisionNotPresent,
333
f.get_ancestry, ['rM', 'rX'])
335
def test_mutate_after_finish(self):
337
f.transaction_finished()
338
self.assertRaises(errors.OutSideTransaction, f.add_delta, '', [], '', '', False, [])
339
self.assertRaises(errors.OutSideTransaction, f.add_lines, '', [], [])
340
self.assertRaises(errors.OutSideTransaction, f.add_lines_with_ghosts, '', [], [])
341
self.assertRaises(errors.OutSideTransaction, f.fix_parents, '', [])
342
self.assertRaises(errors.OutSideTransaction, f.join, '')
343
self.assertRaises(errors.OutSideTransaction, f.clone_text, 'base', 'bar', ['foo'])
345
def test_clear_cache(self):
347
# on a new file it should not error
349
# and after adding content, doing a clear_cache and a get should work.
350
f.add_lines('0', [], ['a'])
352
self.assertEqual(['a'], f.get_lines('0'))
354
def test_clone_text(self):
356
f.add_lines('r0', [], ['a\n', 'b\n'])
357
f.clone_text('r1', 'r0', ['r0'])
359
self.assertEquals(f.get_lines('r1'), f.get_lines('r0'))
360
self.assertEquals(f.get_lines('r1'), ['a\n', 'b\n'])
361
self.assertEquals(f.get_parents('r1'), ['r0'])
363
self.assertRaises(RevisionNotPresent,
364
f.clone_text, 'r2', 'rX', [])
365
self.assertRaises(RevisionAlreadyPresent,
366
f.clone_text, 'r1', 'r0', [])
368
verify_file(self.reopen_file())
370
def test_create_empty(self):
372
f.add_lines('0', [], ['a\n'])
373
new_f = f.create_empty('t', MemoryTransport())
374
# smoke test, specific types should check it is honoured correctly for
375
# non type attributes
376
self.assertEqual([], new_f.versions())
377
self.assertTrue(isinstance(new_f, f.__class__))
379
def test_copy_to(self):
381
f.add_lines('0', [], ['a\n'])
382
t = MemoryTransport()
384
for suffix in f.__class__.get_suffixes():
385
self.assertTrue(t.has('foo' + suffix))
387
def test_get_suffixes(self):
390
self.assertEqual(f.__class__.get_suffixes(), f.__class__.get_suffixes())
391
# and should be a list
392
self.assertTrue(isinstance(f.__class__.get_suffixes(), list))
394
def test_get_graph(self):
396
f.add_lines('v1', [], ['hello\n'])
397
f.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
398
f.add_lines('v3', ['v2'], ['hello\n', 'cruel\n', 'world\n'])
399
self.assertEqual({'v1': [],
404
def test_get_parents(self):
406
f.add_lines('r0', [], ['a\n', 'b\n'])
407
f.add_lines('r1', [], ['a\n', 'b\n'])
408
f.add_lines('r2', [], ['a\n', 'b\n'])
409
f.add_lines('r3', [], ['a\n', 'b\n'])
410
f.add_lines('m', ['r0', 'r1', 'r2', 'r3'], ['a\n', 'b\n'])
411
self.assertEquals(f.get_parents('m'), ['r0', 'r1', 'r2', 'r3'])
413
self.assertRaises(RevisionNotPresent,
416
def test_annotate(self):
418
f.add_lines('r0', [], ['a\n', 'b\n'])
419
f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
420
origins = f.annotate('r1')
421
self.assertEquals(origins[0][0], 'r1')
422
self.assertEquals(origins[1][0], 'r0')
424
self.assertRaises(RevisionNotPresent,
428
# tests that walk returns all the inclusions for the requested
429
# revisions as well as the revisions changes themselves.
430
f = self.get_file('1')
431
f.add_lines('r0', [], ['a\n', 'b\n'])
432
f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
433
f.add_lines('rX', ['r1'], ['d\n', 'b\n'])
434
f.add_lines('rY', ['r1'], ['c\n', 'e\n'])
437
for lineno, insert, dset, text in f.walk(['rX', 'rY']):
438
lines[text] = (insert, dset)
440
self.assertTrue(lines['a\n'], ('r0', set(['r1'])))
441
self.assertTrue(lines['b\n'], ('r0', set(['rY'])))
442
self.assertTrue(lines['c\n'], ('r1', set(['rX'])))
443
self.assertTrue(lines['d\n'], ('rX', set([])))
444
self.assertTrue(lines['e\n'], ('rY', set([])))
446
def test_detection(self):
447
# Test weaves detect corruption.
449
# Weaves contain a checksum of their texts.
450
# When a text is extracted, this checksum should be
453
w = self.get_file_corrupted_text()
455
self.assertEqual('hello\n', w.get_text('v1'))
456
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
457
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
458
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
460
w = self.get_file_corrupted_checksum()
462
self.assertEqual('hello\n', w.get_text('v1'))
463
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
464
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
465
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
467
def get_file_corrupted_text(self):
468
"""Return a versioned file with corrupt text but valid metadata."""
469
raise NotImplementedError(self.get_file_corrupted_text)
471
def reopen_file(self, name='foo'):
472
"""Open the versioned file from disk again."""
473
raise NotImplementedError(self.reopen_file)
475
def test_iter_lines_added_or_present_in_versions(self):
476
# test that we get at least an equalset of the lines added by
477
# versions in the weave
478
# the ordering here is to make a tree so that dumb searches have
479
# more changes to muck up.
481
# add a base to get included
482
vf.add_lines('base', [], ['base\n'])
483
# add a ancestor to be included on one side
484
vf.add_lines('lancestor', [], ['lancestor\n'])
485
# add a ancestor to be included on the other side
486
vf.add_lines('rancestor', ['base'], ['rancestor\n'])
487
# add a child of rancestor with no eofile-nl
488
vf.add_lines('child', ['rancestor'], ['base\n', 'child\n'])
489
# add a child of lancestor and base to join the two roots
490
vf.add_lines('otherchild',
491
['lancestor', 'base'],
492
['base\n', 'lancestor\n', 'otherchild\n'])
493
def iter_with_versions(versions):
494
# now we need to see what lines are returned, and how often.
501
# iterate over the lines
502
for line in vf.iter_lines_added_or_present_in_versions(versions):
505
lines = iter_with_versions(['child', 'otherchild'])
506
# we must see child and otherchild
507
self.assertTrue(lines['child\n'] > 0)
508
self.assertTrue(lines['otherchild\n'] > 0)
509
# we dont care if we got more than that.
512
lines = iter_with_versions(None)
513
# all lines must be seen at least once
514
self.assertTrue(lines['base\n'] > 0)
515
self.assertTrue(lines['lancestor\n'] > 0)
516
self.assertTrue(lines['rancestor\n'] > 0)
517
self.assertTrue(lines['child\n'] > 0)
518
self.assertTrue(lines['otherchild\n'] > 0)
520
def test_fix_parents(self):
521
# some versioned files allow incorrect parents to be corrected after
522
# insertion - this may not fix ancestry..
523
# if they do not supported, they just do not implement it.
524
# we test this as an interface test to ensure that those that *do*
525
# implementent it get it right.
527
vf.add_lines('notbase', [], [])
528
vf.add_lines('base', [], [])
530
vf.fix_parents('notbase', ['base'])
531
except NotImplementedError:
533
self.assertEqual(['base'], vf.get_parents('notbase'))
534
# open again, check it stuck.
536
self.assertEqual(['base'], vf.get_parents('notbase'))
538
def test_fix_parents_with_ghosts(self):
539
# when fixing parents, ghosts that are listed should not be ghosts
544
vf.add_lines_with_ghosts('notbase', ['base', 'stillghost'], [])
545
except NotImplementedError:
547
vf.add_lines('base', [], [])
548
vf.fix_parents('notbase', ['base', 'stillghost'])
549
self.assertEqual(['base'], vf.get_parents('notbase'))
550
# open again, check it stuck.
552
self.assertEqual(['base'], vf.get_parents('notbase'))
553
# and check the ghosts
554
self.assertEqual(['base', 'stillghost'],
555
vf.get_parents_with_ghosts('notbase'))
557
def test_add_lines_with_ghosts(self):
558
# some versioned file formats allow lines to be added with parent
559
# information that is > than that in the format. Formats that do
560
# not support this need to raise NotImplementedError on the
561
# add_lines_with_ghosts api.
563
# add a revision with ghost parents
565
vf.add_lines_with_ghosts(u'notbxbfse', [u'b\xbfse'], [])
566
except NotImplementedError:
567
# check the other ghost apis are also not implemented
568
self.assertRaises(NotImplementedError, vf.has_ghost, 'foo')
569
self.assertRaises(NotImplementedError, vf.get_ancestry_with_ghosts, ['foo'])
570
self.assertRaises(NotImplementedError, vf.get_parents_with_ghosts, 'foo')
571
self.assertRaises(NotImplementedError, vf.get_graph_with_ghosts)
573
# test key graph related apis: getncestry, _graph, get_parents
575
# - these are ghost unaware and must not be reflect ghosts
576
self.assertEqual([u'notbxbfse'], vf.get_ancestry(u'notbxbfse'))
577
self.assertEqual([], vf.get_parents(u'notbxbfse'))
578
self.assertEqual({u'notbxbfse':[]}, vf.get_graph())
579
self.assertFalse(vf.has_version(u'b\xbfse'))
580
# we have _with_ghost apis to give us ghost information.
581
self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry_with_ghosts([u'notbxbfse']))
582
self.assertEqual([u'b\xbfse'], vf.get_parents_with_ghosts(u'notbxbfse'))
583
self.assertEqual({u'notbxbfse':[u'b\xbfse']}, vf.get_graph_with_ghosts())
584
self.assertTrue(vf.has_ghost(u'b\xbfse'))
585
# if we add something that is a ghost of another, it should correct the
586
# results of the prior apis
587
vf.add_lines(u'b\xbfse', [], [])
588
self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry([u'notbxbfse']))
589
self.assertEqual([u'b\xbfse'], vf.get_parents(u'notbxbfse'))
590
self.assertEqual({u'b\xbfse':[],
591
u'notbxbfse':[u'b\xbfse'],
594
self.assertTrue(vf.has_version(u'b\xbfse'))
595
# we have _with_ghost apis to give us ghost information.
596
self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry_with_ghosts([u'notbxbfse']))
597
self.assertEqual([u'b\xbfse'], vf.get_parents_with_ghosts(u'notbxbfse'))
598
self.assertEqual({u'b\xbfse':[],
599
u'notbxbfse':[u'b\xbfse'],
601
vf.get_graph_with_ghosts())
602
self.assertFalse(vf.has_ghost(u'b\xbfse'))
604
def test_add_lines_with_ghosts_after_normal_revs(self):
605
# some versioned file formats allow lines to be added with parent
606
# information that is > than that in the format. Formats that do
607
# not support this need to raise NotImplementedError on the
608
# add_lines_with_ghosts api.
610
# probe for ghost support
613
except NotImplementedError:
615
vf.add_lines_with_ghosts('base', [], ['line\n', 'line_b\n'])
616
vf.add_lines_with_ghosts('references_ghost',
618
['line\n', 'line_b\n', 'line_c\n'])
619
origins = vf.annotate('references_ghost')
620
self.assertEquals(('base', 'line\n'), origins[0])
621
self.assertEquals(('base', 'line_b\n'), origins[1])
622
self.assertEquals(('references_ghost', 'line_c\n'), origins[2])
624
def test_readonly_mode(self):
625
transport = get_transport(self.get_url('.'))
626
factory = self.get_factory()
627
vf = factory('id', transport, 0777, create=True, access_mode='w')
628
vf = factory('id', transport, access_mode='r')
629
self.assertRaises(errors.ReadOnlyError, vf.add_delta, '', [], '', '', False, [])
630
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'base', [], [])
631
self.assertRaises(errors.ReadOnlyError,
632
vf.add_lines_with_ghosts,
636
self.assertRaises(errors.ReadOnlyError, vf.fix_parents, 'base', [])
637
self.assertRaises(errors.ReadOnlyError, vf.join, 'base')
638
self.assertRaises(errors.ReadOnlyError, vf.clone_text, 'base', 'bar', ['foo'])
641
class TestWeave(TestCaseWithTransport, VersionedFileTestMixIn):
643
def get_file(self, name='foo'):
644
return WeaveFile(name, get_transport(self.get_url('.')), create=True)
646
def get_file_corrupted_text(self):
647
w = WeaveFile('foo', get_transport(self.get_url('.')), create=True)
648
w.add_lines('v1', [], ['hello\n'])
649
w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
651
# We are going to invasively corrupt the text
652
# Make sure the internals of weave are the same
653
self.assertEqual([('{', 0)
661
self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
662
, '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
667
w._weave[4] = 'There\n'
670
def get_file_corrupted_checksum(self):
671
w = self.get_file_corrupted_text()
673
w._weave[4] = 'there\n'
674
self.assertEqual('hello\nthere\n', w.get_text('v2'))
676
#Invalid checksum, first digit changed
677
w._sha1s[1] = 'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
680
def reopen_file(self, name='foo'):
681
return WeaveFile(name, get_transport(self.get_url('.')))
683
def test_no_implicit_create(self):
684
self.assertRaises(errors.NoSuchFile,
687
get_transport(self.get_url('.')))
689
def get_factory(self):
693
class TestKnit(TestCaseWithTransport, VersionedFileTestMixIn):
695
def get_file(self, name='foo'):
696
return KnitVersionedFile(name, get_transport(self.get_url('.')),
697
delta=True, create=True)
699
def get_factory(self):
700
return KnitVersionedFile
702
def get_file_corrupted_text(self):
703
knit = self.get_file()
704
knit.add_lines('v1', [], ['hello\n'])
705
knit.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
708
def reopen_file(self, name='foo'):
709
return KnitVersionedFile(name, get_transport(self.get_url('.')), delta=True)
711
def test_detection(self):
712
print "TODO for merging: create a corrupted knit."
713
knit = self.get_file()
716
def test_no_implicit_create(self):
717
self.assertRaises(errors.NoSuchFile,
720
get_transport(self.get_url('.')))
723
class InterString(versionedfile.InterVersionedFile):
724
"""An inter-versionedfile optimised code path for strings.
726
This is for use during testing where we use strings as versionedfiles
727
so that none of the default regsitered interversionedfile classes will
728
match - which lets us test the match logic.
732
def is_compatible(source, target):
733
"""InterString is compatible with strings-as-versionedfiles."""
734
return isinstance(source, str) and isinstance(target, str)
737
# TODO this and the InterRepository core logic should be consolidatable
738
# if we make the registry a separate class though we still need to
739
# test the behaviour in the active registry to catch failure-to-handle-
741
class TestInterVersionedFile(TestCaseWithTransport):
743
def test_get_default_inter_versionedfile(self):
744
# test that the InterVersionedFile.get(a, b) probes
745
# for a class where is_compatible(a, b) returns
746
# true and returns a default interversionedfile otherwise.
747
# This also tests that the default registered optimised interversionedfile
748
# classes do not barf inappropriately when a surprising versionedfile type
750
dummy_a = "VersionedFile 1."
751
dummy_b = "VersionedFile 2."
752
self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
754
def assertGetsDefaultInterVersionedFile(self, a, b):
755
"""Asserts that InterVersionedFile.get(a, b) -> the default."""
756
inter = versionedfile.InterVersionedFile.get(a, b)
757
self.assertEqual(versionedfile.InterVersionedFile,
759
self.assertEqual(a, inter.source)
760
self.assertEqual(b, inter.target)
762
def test_register_inter_versionedfile_class(self):
763
# test that a optimised code path provider - a
764
# InterVersionedFile subclass can be registered and unregistered
765
# and that it is correctly selected when given a versionedfile
766
# pair that it returns true on for the is_compatible static method
768
dummy_a = "VersionedFile 1."
769
dummy_b = "VersionedFile 2."
770
versionedfile.InterVersionedFile.register_optimiser(InterString)
772
# we should get the default for something InterString returns False
774
self.assertFalse(InterString.is_compatible(dummy_a, None))
775
self.assertGetsDefaultInterVersionedFile(dummy_a, None)
776
# and we should get an InterString for a pair it 'likes'
777
self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
778
inter = versionedfile.InterVersionedFile.get(dummy_a, dummy_b)
779
self.assertEqual(InterString, inter.__class__)
780
self.assertEqual(dummy_a, inter.source)
781
self.assertEqual(dummy_b, inter.target)
783
versionedfile.InterVersionedFile.unregister_optimiser(InterString)
784
# now we should get the default InterVersionedFile object again.
785
self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)