59
59
they are strictly controlled by their owning repositories.
62
def get_transaction(self):
63
if not hasattr(self, '_transaction'):
64
self._transaction = None
65
return self._transaction
62
67
def test_add(self):
63
68
f = self.get_file()
64
69
f.add_lines('r0', [], ['a\n', 'b\n'])
348
353
set(f.get_ancestry('rM', topo_sorted=False)))
350
355
def test_mutate_after_finish(self):
356
self._transaction = 'before'
351
357
f = self.get_file()
352
f.transaction_finished()
358
self._transaction = 'after'
353
359
self.assertRaises(errors.OutSideTransaction, f.add_lines, '', [], [])
354
360
self.assertRaises(errors.OutSideTransaction, f.add_lines_with_ghosts, '', [], [])
355
361
self.assertRaises(errors.OutSideTransaction, f.join, '')
384
390
f.add_lines('0', [], ['a\n'])
385
391
t = MemoryTransport()
386
392
f.copy_to('foo', t)
387
for suffix in f.__class__.get_suffixes():
393
for suffix in self.get_factory().get_suffixes():
388
394
self.assertTrue(t.has('foo' + suffix))
390
396
def test_get_suffixes(self):
391
397
f = self.get_file()
393
self.assertEqual(f.__class__.get_suffixes(), f.__class__.get_suffixes())
394
398
# and should be a list
395
self.assertTrue(isinstance(f.__class__.get_suffixes(), list))
399
self.assertTrue(isinstance(self.get_factory().get_suffixes(), list))
397
401
def build_graph(self, file, graph):
398
402
for node in topo_sort(graph.items()):
720
724
class TestWeave(TestCaseWithMemoryTransport, VersionedFileTestMixIn):
722
726
def get_file(self, name='foo'):
723
return WeaveFile(name, get_transport(self.get_url('.')), create=True)
727
return WeaveFile(name, get_transport(self.get_url('.')), create=True,
728
get_scope=self.get_transaction)
725
730
def get_file_corrupted_text(self):
726
w = WeaveFile('foo', get_transport(self.get_url('.')), create=True)
731
w = WeaveFile('foo', get_transport(self.get_url('.')), create=True,
732
get_scope=self.get_transaction)
727
733
w.add_lines('v1', [], ['hello\n'])
728
734
w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
759
765
def reopen_file(self, name='foo', create=False):
760
return WeaveFile(name, get_transport(self.get_url('.')), create=create)
766
return WeaveFile(name, get_transport(self.get_url('.')), create=create,
767
get_scope=self.get_transaction)
762
769
def test_no_implicit_create(self):
763
770
self.assertRaises(errors.NoSuchFile,
766
get_transport(self.get_url('.')))
773
get_transport(self.get_url('.')),
774
get_scope=self.get_transaction)
768
776
def get_factory(self):
774
782
def get_file(self, name='foo'):
775
783
return self.get_factory()(name, get_transport(self.get_url('.')),
776
delta=True, create=True)
784
delta=True, create=True, get_scope=self.get_transaction)
778
786
def get_factory(self):
779
return KnitVersionedFile
787
return make_file_knit
781
789
def get_file_corrupted_text(self):
782
790
knit = self.get_file()
796
804
def test_no_implicit_create(self):
797
self.assertRaises(errors.NoSuchFile,
800
get_transport(self.get_url('.')))
805
self.assertRaises(errors.NoSuchFile, self.get_factory(), 'foo',
806
get_transport(self.get_url('.')))
803
809
class TestPlaintextKnit(TestKnit):
804
810
"""Test a knit with no cached annotations"""
806
def _factory(self, name, transport, file_mode=None, access_mode=None,
807
delta=True, create=False):
808
return KnitVersionedFile(name, transport, file_mode, access_mode,
809
KnitPlainFactory(), delta=delta,
812
812
def get_factory(self):
813
return make_file_knit
816
816
class TestPlanMergeVersionedFile(TestCaseWithMemoryTransport):
819
819
TestCaseWithMemoryTransport.setUp(self)
820
self.vf1 = KnitVersionedFile('root', self.get_transport(), create=True)
821
self.vf2 = KnitVersionedFile('root', self.get_transport(), create=True)
820
self.vf1 = make_file_knit('root', self.get_transport(), create=True)
821
self.vf2 = make_file_knit('root', self.get_transport(), create=True)
822
822
self.plan_merge_vf = versionedfile._PlanMergeVersionedFile('root',
823
823
[self.vf1, self.vf2])
964
967
class TestWeaveHTTP(TestCaseWithWebserver, TestReadonlyHttpMixin):
966
969
def get_file(self):
967
return WeaveFile('foo', get_transport(self.get_url('.')), create=True)
970
return WeaveFile('foo', get_transport(self.get_url('.')), create=True,
971
get_scope=self.get_transaction)
969
973
def get_factory(self):
973
977
class TestKnitHTTP(TestCaseWithWebserver, TestReadonlyHttpMixin):
975
979
def get_file(self):
976
return KnitVersionedFile('foo', get_transport(self.get_url('.')),
977
delta=True, create=True)
980
return make_file_knit('foo', get_transport(self.get_url('.')),
981
delta=True, create=True, get_scope=self.get_transaction)
979
983
def get_factory(self):
980
return KnitVersionedFile
984
return make_file_knit
983
987
class MergeCasesMixin(object):
1220
1224
class TestKnitMerge(TestCaseWithMemoryTransport, MergeCasesMixin):
1222
1226
def get_file(self, name='foo'):
1223
return KnitVersionedFile(name, get_transport(self.get_url('.')),
1227
return make_file_knit(name, get_transport(self.get_url('.')),
1224
1228
delta=True, create=True)
1226
1230
def log_contents(self, w):
1241
1245
overlappedInsertExpected = ['aaa', '<<<<<<< ', 'xxx', 'yyy', '=======',
1242
1246
'xxx', '>>>>>>> ', 'bbb']
1245
class TestFormatSignatures(TestCaseWithMemoryTransport):
1247
def get_knit_file(self, name, annotated):
1249
factory = KnitAnnotateFactory()
1251
factory = KnitPlainFactory()
1252
return KnitVersionedFile(
1253
name, get_transport(self.get_url('.')), create=True,
1256
def test_knit_format_signatures(self):
1257
"""Different formats of knit have different signature strings."""
1258
knit = self.get_knit_file('a', True)
1259
self.assertEqual('knit-annotated', knit.get_format_signature())
1260
knit = self.get_knit_file('p', False)
1261
self.assertEqual('knit-plain', knit.get_format_signature())