19
19
from cStringIO import StringIO
22
from bzrlib.errors import BzrError, UnlistableStore
23
from bzrlib.store import copy_all
23
import bzrlib.errors as errors
24
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
24
25
from bzrlib.transport.local import LocalTransport
25
from bzrlib.transport import NoSuchFile
26
from bzrlib.store.compressed_text import CompressedTextStore
27
26
from bzrlib.store.text import TextStore
28
from bzrlib.selftest import TestCase, TestCaseInTempDir
27
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
29
28
import bzrlib.store as store
29
import bzrlib.transactions as transactions
30
30
import bzrlib.transport as transport
31
31
from bzrlib.transport.memory import MemoryTransport
34
34
class TestStores(object):
35
"""Mixin template class that provides some common tests for stores"""
36
37
def check_content(self, store, fileid, value):
37
38
f = store.get(fileid)
50
51
store_a.add('foo', '1')
52
53
store_b = self.get_store('b')
53
copy_all(store_a, store_b)
54
store_b.copy_all_ids(store_a)
54
55
self.assertEqual(store_a.get('1').read(), 'foo')
55
56
self.assertEqual(store_b.get('1').read(), 'foo')
56
57
# TODO: Switch the exception form UnlistableStore to
80
81
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
82
def get_store(self, path='.'):
83
def get_store(self, path=u'.'):
83
84
t = LocalTransport(path)
84
return CompressedTextStore(t)
85
return TextStore(t, compressed=True)
86
87
def test_total_size(self):
87
store = self.get_store('.')
88
store = self.get_store(u'.')
88
89
store.register_suffix('dsc')
89
90
store.add(StringIO('goodbye'), '123123')
90
91
store.add(StringIO('goodbye2'), '123123', 'dsc')
92
93
self.assertEqual(store.total_size(), (2, 55))
94
95
def test__relpath_suffixed(self):
95
my_store = CompressedTextStore(MockTransport(), True)
96
my_store = TextStore(MockTransport(),
97
prefixed=True, compressed=True)
96
98
my_store.register_suffix('dsc')
97
self.assertEqual('45/foo.dsc.gz', my_store._relpath('foo', ['dsc']))
99
self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
100
102
class TestMemoryStore(TestCase):
139
141
class TestTextStore(TestCaseInTempDir, TestStores):
141
def get_store(self, path='.'):
143
def get_store(self, path=u'.'):
142
144
t = LocalTransport(path)
145
return TextStore(t, compressed=False)
145
147
def test_total_size(self):
146
148
store = self.get_store()
154
156
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
159
class TestMixedTextStore(TestCaseInTempDir, TestStores):
161
def get_store(self, path=u'.', compressed=True):
162
t = LocalTransport(path)
163
return TextStore(t, compressed=compressed)
165
def test_get_mixed(self):
166
cs = self.get_store(u'.', compressed=True)
167
s = self.get_store(u'.', compressed=False)
168
cs.add(StringIO('hello there'), 'a')
170
self.failUnlessExists('a.gz')
171
self.failIf(os.path.lexists('a'))
173
self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
175
self.assertEquals(cs.has_id('a'), True)
176
self.assertEquals(s.has_id('a'), True)
177
self.assertEquals(cs.get('a').read(), 'hello there')
178
self.assertEquals(s.get('a').read(), 'hello there')
180
self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
182
s.add(StringIO('goodbye'), 'b')
183
self.failUnlessExists('b')
184
self.failIf(os.path.lexists('b.gz'))
185
self.assertEquals(open('b').read(), 'goodbye')
187
self.assertEquals(cs.has_id('b'), True)
188
self.assertEquals(s.has_id('b'), True)
189
self.assertEquals(cs.get('b').read(), 'goodbye')
190
self.assertEquals(s.get('b').read(), 'goodbye')
192
self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
157
194
class MockTransport(transport.Transport):
158
195
"""A fake transport for testing with."""
232
269
def test__relpath_simple_suffixed(self):
233
270
my_store = store.TransportStore(MockTransport())
234
my_store.register_suffix('gz')
235
271
my_store.register_suffix('bar')
236
self.assertEqual('foo.gz', my_store._relpath('foo', ['gz']))
237
self.assertEqual('foo.gz.bar', my_store._relpath('foo', ['gz', 'bar']))
272
my_store.register_suffix('baz')
273
self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
274
self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
239
276
def test__relpath_prefixed_suffixed(self):
240
277
my_store = store.TransportStore(MockTransport(), True)
241
my_store.register_suffix('gz')
242
278
my_store.register_suffix('bar')
243
self.assertEqual('45/foo.gz', my_store._relpath('foo', ['gz']))
244
self.assertEqual('45/foo.gz.bar',
245
my_store._relpath('foo', ['gz', 'bar']))
279
my_store.register_suffix('baz')
280
self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
281
self.assertEqual('45/foo.bar.baz',
282
my_store._relpath('foo', ['bar', 'baz']))
247
284
def test_add_simple(self):
248
285
stream = StringIO("content")
270
307
my_store.add(stream, "foo", 'dsc')
271
308
self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
273
def get_populated_store(self, prefixed=False, store_class=TextStore):
274
my_store = store_class(MemoryTransport(), prefixed)
310
def get_populated_store(self, prefixed=False,
311
store_class=TextStore, compressed=False):
312
my_store = store_class(MemoryTransport(), prefixed,
313
compressed=compressed)
275
314
my_store.register_suffix('sig')
276
315
stream = StringIO("signature")
277
316
my_store.add(stream, "foo", 'sig')
320
359
my_store.get('missing', 'sig').read())
322
361
def test___iter__no_suffix(self):
323
my_store = TextStore(MemoryTransport(), False)
362
my_store = TextStore(MemoryTransport(),
363
prefixed=False, compressed=False)
324
364
stream = StringIO("content")
325
365
my_store.add(stream, "foo")
326
366
self.assertEqual(set(['foo']),
335
375
def test___iter__compressed(self):
336
376
self.assertEqual(set(['foo']),
337
377
set(self.get_populated_store(
338
store_class=CompressedTextStore).__iter__()))
378
compressed=True).__iter__()))
339
379
self.assertEqual(set(['foo']),
340
380
set(self.get_populated_store(
341
True, CompressedTextStore).__iter__()))
381
True, compressed=True).__iter__()))
343
383
def test___len__(self):
344
384
self.assertEqual(1, len(self.get_populated_store()))
346
386
def test_copy_suffixes(self):
347
387
from_store = self.get_populated_store()
348
to_store = CompressedTextStore(MemoryTransport(), True)
388
to_store = TextStore(MemoryTransport(),
389
prefixed=True, compressed=True)
349
390
to_store.register_suffix('sig')
350
copy_all(from_store, to_store)
391
to_store.copy_all_ids(from_store)
351
392
self.assertEqual(1, len(to_store))
352
393
self.assertEqual(set(['foo']), set(to_store.__iter__()))
353
394
self.assertEqual('content', to_store.get('foo').read())
357
398
def test_relpath_escaped(self):
358
399
my_store = store.TransportStore(MemoryTransport())
359
400
self.assertEqual('%25', my_store._relpath('%'))
403
class TestVersionFileStore(TestCaseWithTransport):
406
super(TestVersionFileStore, self).setUp()
407
self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
409
def test_get_weave_registers_dirty_in_write(self):
410
transaction = transactions.WriteTransaction()
411
vf = self.vfstore.get_weave_or_empty('id', transaction)
413
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
414
transaction = transactions.WriteTransaction()
415
vf = self.vfstore.get_weave('id', transaction)
417
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
419
def test_get_weave_or_empty_readonly_fails(self):
420
transaction = transactions.ReadOnlyTransaction()
421
vf = self.assertRaises(errors.ReadOnlyError,
422
self.vfstore.get_weave_or_empty,
426
def test_get_weave_readonly_cant_write(self):
427
transaction = transactions.WriteTransaction()
428
vf = self.vfstore.get_weave_or_empty('id', transaction)
430
transaction = transactions.ReadOnlyTransaction()
431
vf = self.vfstore.get_weave_or_empty('id', transaction)
432
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])