19
19
from cStringIO import StringIO
23
import bzrlib.errors as errors
24
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
22
from bzrlib.errors import BzrError, UnlistableStore
23
from bzrlib.store import copy_all
25
24
from bzrlib.transport.local import LocalTransport
25
from bzrlib.transport import NoSuchFile
26
from bzrlib.store.compressed_text import CompressedTextStore
26
27
from bzrlib.store.text import TextStore
27
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
28
from bzrlib.selftest import TestCase, TestCaseInTempDir
28
29
import bzrlib.store as store
29
import bzrlib.transactions as transactions
30
30
import bzrlib.transport as transport
31
31
from bzrlib.transport.memory import MemoryTransport
34
34
class TestStores(object):
35
"""Mixin template class that provides some common tests for stores"""
37
36
def check_content(self, store, fileid, value):
38
37
f = store.get(fileid)
81
80
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
83
def get_store(self, path=u'.'):
82
def get_store(self, path='.'):
84
83
t = LocalTransport(path)
85
return TextStore(t, compressed=True)
84
return CompressedTextStore(t)
87
86
def test_total_size(self):
88
store = self.get_store(u'.')
87
store = self.get_store('.')
89
88
store.register_suffix('dsc')
90
89
store.add(StringIO('goodbye'), '123123')
91
90
store.add(StringIO('goodbye2'), '123123', 'dsc')
93
92
self.assertEqual(store.total_size(), (2, 55))
95
94
def test__relpath_suffixed(self):
96
my_store = TextStore(MockTransport(),
97
prefixed=True, compressed=True)
95
my_store = CompressedTextStore(MockTransport(), True)
98
96
my_store.register_suffix('dsc')
99
self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
97
self.assertEqual('45/foo.dsc.gz', my_store._relpath('foo', ['dsc']))
102
100
class TestMemoryStore(TestCase):
104
102
def get_store(self):
105
return TextStore(MemoryTransport())
103
return store.ImmutableMemoryStore()
105
def test_imports(self):
106
from bzrlib.store import ImmutableMemoryStore
107
108
def test_add_and_retrieve(self):
108
109
store = self.get_store()
109
110
store.add(StringIO('hello'), 'aa')
153
154
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
156
class TestMixedTextStore(TestCaseInTempDir, TestStores):
158
def get_store(self, path=u'.', compressed=True):
159
t = LocalTransport(path)
160
return TextStore(t, compressed=compressed)
162
def test_get_mixed(self):
163
cs = self.get_store(u'.', compressed=True)
164
s = self.get_store(u'.', compressed=False)
165
cs.add(StringIO('hello there'), 'a')
167
self.failUnlessExists('a.gz')
168
self.failIf(os.path.lexists('a'))
170
self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
172
self.assertEquals(cs.has_id('a'), True)
173
self.assertEquals(s.has_id('a'), True)
174
self.assertEquals(cs.get('a').read(), 'hello there')
175
self.assertEquals(s.get('a').read(), 'hello there')
177
self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
179
s.add(StringIO('goodbye'), 'b')
180
self.failUnlessExists('b')
181
self.failIf(os.path.lexists('b.gz'))
182
self.assertEquals(open('b').read(), 'goodbye')
184
self.assertEquals(cs.has_id('b'), True)
185
self.assertEquals(s.has_id('b'), True)
186
self.assertEquals(cs.get('b').read(), 'goodbye')
187
self.assertEquals(s.get('b').read(), 'goodbye')
189
self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
191
157
class MockTransport(transport.Transport):
192
158
"""A fake transport for testing with."""
266
232
def test__relpath_simple_suffixed(self):
267
233
my_store = store.TransportStore(MockTransport())
234
my_store.register_suffix('gz')
268
235
my_store.register_suffix('bar')
269
my_store.register_suffix('baz')
270
self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
271
self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
236
self.assertEqual('foo.gz', my_store._relpath('foo', ['gz']))
237
self.assertEqual('foo.gz.bar', my_store._relpath('foo', ['gz', 'bar']))
273
239
def test__relpath_prefixed_suffixed(self):
274
240
my_store = store.TransportStore(MockTransport(), True)
241
my_store.register_suffix('gz')
275
242
my_store.register_suffix('bar')
276
my_store.register_suffix('baz')
277
self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
278
self.assertEqual('45/foo.bar.baz',
279
my_store._relpath('foo', ['bar', 'baz']))
243
self.assertEqual('45/foo.gz', my_store._relpath('foo', ['gz']))
244
self.assertEqual('45/foo.gz.bar',
245
my_store._relpath('foo', ['gz', 'bar']))
281
247
def test_add_simple(self):
282
248
stream = StringIO("content")
304
270
my_store.add(stream, "foo", 'dsc')
305
271
self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
307
def get_populated_store(self, prefixed=False,
308
store_class=TextStore, compressed=False):
309
my_store = store_class(MemoryTransport(), prefixed,
310
compressed=compressed)
273
def get_populated_store(self, prefixed=False, store_class=TextStore):
274
my_store = store_class(MemoryTransport(), prefixed)
311
275
my_store.register_suffix('sig')
312
276
stream = StringIO("signature")
313
277
my_store.add(stream, "foo", 'sig')
372
335
def test___iter__compressed(self):
373
336
self.assertEqual(set(['foo']),
374
337
set(self.get_populated_store(
375
compressed=True).__iter__()))
338
store_class=CompressedTextStore).__iter__()))
376
339
self.assertEqual(set(['foo']),
377
340
set(self.get_populated_store(
378
True, compressed=True).__iter__()))
341
True, CompressedTextStore).__iter__()))
380
343
def test___len__(self):
381
344
self.assertEqual(1, len(self.get_populated_store()))
383
346
def test_copy_suffixes(self):
384
347
from_store = self.get_populated_store()
385
to_store = TextStore(MemoryTransport(),
386
prefixed=True, compressed=True)
348
to_store = CompressedTextStore(MemoryTransport(), True)
387
349
to_store.register_suffix('sig')
388
to_store.copy_all_ids(from_store)
350
copy_all(from_store, to_store)
389
351
self.assertEqual(1, len(to_store))
390
352
self.assertEqual(set(['foo']), set(to_store.__iter__()))
391
353
self.assertEqual('content', to_store.get('foo').read())
392
354
self.assertEqual('signature', to_store.get('foo', 'sig').read())
393
355
self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
395
def test_relpath_escaped(self):
396
my_store = store.TransportStore(MemoryTransport())
397
self.assertEqual('%25', my_store._relpath('%'))
399
def test_escaped_uppercase(self):
400
"""Uppercase letters are escaped for safety on Windows"""
401
my_store = store.TransportStore(MemoryTransport(), escaped=True)
402
# a particularly perverse file-id! :-)
403
self.assertEquals(my_store._escape_file_id('C:<>'), '%43%3a%3c%3e')
406
class TestVersionFileStore(TestCaseWithTransport):
409
super(TestVersionFileStore, self).setUp()
410
self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
412
def test_get_weave_registers_dirty_in_write(self):
413
transaction = transactions.WriteTransaction()
414
vf = self.vfstore.get_weave_or_empty('id', transaction)
416
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
417
transaction = transactions.WriteTransaction()
418
vf = self.vfstore.get_weave('id', transaction)
420
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
422
def test_get_weave_or_empty_readonly_fails(self):
423
transaction = transactions.ReadOnlyTransaction()
424
vf = self.assertRaises(errors.ReadOnlyError,
425
self.vfstore.get_weave_or_empty,
429
def test_get_weave_readonly_cant_write(self):
430
transaction = transactions.WriteTransaction()
431
vf = self.vfstore.get_weave_or_empty('id', transaction)
433
transaction = transactions.ReadOnlyTransaction()
434
vf = self.vfstore.get_weave_or_empty('id', transaction)
435
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])