19
19
from cStringIO import StringIO
22
from bzrlib.errors import BzrError, UnlistableStore
23
from bzrlib.store import copy_all
23
import bzrlib.errors as errors
24
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
24
25
from bzrlib.transport.local import LocalTransport
25
from bzrlib.transport import NoSuchFile
26
from bzrlib.store.compressed_text import CompressedTextStore
27
26
from bzrlib.store.text import TextStore
28
from bzrlib.selftest import TestCase, TestCaseInTempDir
27
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
29
28
import bzrlib.store as store
29
import bzrlib.transactions as transactions
30
30
import bzrlib.transport as transport
31
31
from bzrlib.transport.memory import MemoryTransport
34
34
class TestStores(object):
35
"""Mixin template class that provides some common tests for stores"""
36
37
def check_content(self, store, fileid, value):
37
38
f = store.get(fileid)
50
51
store_a.add('foo', '1')
52
53
store_b = self.get_store('b')
53
copy_all(store_a, store_b)
54
store_b.copy_all_ids(store_a)
54
55
self.assertEqual(store_a.get('1').read(), 'foo')
55
56
self.assertEqual(store_b.get('1').read(), 'foo')
56
57
# TODO: Switch the exception form UnlistableStore to
80
81
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
82
def get_store(self, path='.'):
83
def get_store(self, path=u'.'):
83
84
t = LocalTransport(path)
84
return CompressedTextStore(t)
85
return TextStore(t, compressed=True)
86
87
def test_total_size(self):
87
store = self.get_store('.')
88
store = self.get_store(u'.')
88
89
store.register_suffix('dsc')
89
90
store.add(StringIO('goodbye'), '123123')
90
91
store.add(StringIO('goodbye2'), '123123', 'dsc')
92
93
self.assertEqual(store.total_size(), (2, 55))
94
95
def test__relpath_suffixed(self):
95
my_store = CompressedTextStore(MockTransport(), True)
96
my_store = TextStore(MockTransport(),
97
prefixed=True, compressed=True)
96
98
my_store.register_suffix('dsc')
97
self.assertEqual('45/foo.dsc.gz', my_store._relpath('foo', ['dsc']))
99
self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
100
102
class TestMemoryStore(TestCase):
102
104
def get_store(self):
103
return store.ImmutableMemoryStore()
105
return TextStore(MemoryTransport())
105
def test_imports(self):
106
from bzrlib.store import ImmutableMemoryStore
108
107
def test_add_and_retrieve(self):
109
108
store = self.get_store()
110
109
store.add(StringIO('hello'), 'aa')
139
138
class TestTextStore(TestCaseInTempDir, TestStores):
141
def get_store(self, path='.'):
140
def get_store(self, path=u'.'):
142
141
t = LocalTransport(path)
142
return TextStore(t, compressed=False)
145
144
def test_total_size(self):
146
145
store = self.get_store()
154
153
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
156
class TestMixedTextStore(TestCaseInTempDir, TestStores):
158
def get_store(self, path=u'.', compressed=True):
159
t = LocalTransport(path)
160
return TextStore(t, compressed=compressed)
162
def test_get_mixed(self):
163
cs = self.get_store(u'.', compressed=True)
164
s = self.get_store(u'.', compressed=False)
165
cs.add(StringIO('hello there'), 'a')
167
self.failUnlessExists('a.gz')
168
self.failIf(os.path.lexists('a'))
170
self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
172
self.assertEquals(cs.has_id('a'), True)
173
self.assertEquals(s.has_id('a'), True)
174
self.assertEquals(cs.get('a').read(), 'hello there')
175
self.assertEquals(s.get('a').read(), 'hello there')
177
self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
179
s.add(StringIO('goodbye'), 'b')
180
self.failUnlessExists('b')
181
self.failIf(os.path.lexists('b.gz'))
182
self.assertEquals(open('b').read(), 'goodbye')
184
self.assertEquals(cs.has_id('b'), True)
185
self.assertEquals(s.has_id('b'), True)
186
self.assertEquals(cs.get('b').read(), 'goodbye')
187
self.assertEquals(s.get('b').read(), 'goodbye')
189
self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
157
191
class MockTransport(transport.Transport):
158
192
"""A fake transport for testing with."""
232
266
def test__relpath_simple_suffixed(self):
233
267
my_store = store.TransportStore(MockTransport())
234
my_store.register_suffix('gz')
235
268
my_store.register_suffix('bar')
236
self.assertEqual('foo.gz', my_store._relpath('foo', ['gz']))
237
self.assertEqual('foo.gz.bar', my_store._relpath('foo', ['gz', 'bar']))
269
my_store.register_suffix('baz')
270
self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
271
self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
239
273
def test__relpath_prefixed_suffixed(self):
240
274
my_store = store.TransportStore(MockTransport(), True)
241
my_store.register_suffix('gz')
242
275
my_store.register_suffix('bar')
243
self.assertEqual('45/foo.gz', my_store._relpath('foo', ['gz']))
244
self.assertEqual('45/foo.gz.bar',
245
my_store._relpath('foo', ['gz', 'bar']))
276
my_store.register_suffix('baz')
277
self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
278
self.assertEqual('45/foo.bar.baz',
279
my_store._relpath('foo', ['bar', 'baz']))
247
281
def test_add_simple(self):
248
282
stream = StringIO("content")
270
304
my_store.add(stream, "foo", 'dsc')
271
305
self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
273
def get_populated_store(self, prefixed=False, store_class=TextStore):
274
my_store = store_class(MemoryTransport(), prefixed)
307
def get_populated_store(self, prefixed=False,
308
store_class=TextStore, compressed=False):
309
my_store = store_class(MemoryTransport(), prefixed,
310
compressed=compressed)
275
311
my_store.register_suffix('sig')
276
312
stream = StringIO("signature")
277
313
my_store.add(stream, "foo", 'sig')
320
356
my_store.get('missing', 'sig').read())
322
358
def test___iter__no_suffix(self):
323
my_store = TextStore(MemoryTransport(), False)
359
my_store = TextStore(MemoryTransport(),
360
prefixed=False, compressed=False)
324
361
stream = StringIO("content")
325
362
my_store.add(stream, "foo")
326
363
self.assertEqual(set(['foo']),
335
372
def test___iter__compressed(self):
336
373
self.assertEqual(set(['foo']),
337
374
set(self.get_populated_store(
338
store_class=CompressedTextStore).__iter__()))
375
compressed=True).__iter__()))
339
376
self.assertEqual(set(['foo']),
340
377
set(self.get_populated_store(
341
True, CompressedTextStore).__iter__()))
378
True, compressed=True).__iter__()))
343
380
def test___len__(self):
344
381
self.assertEqual(1, len(self.get_populated_store()))
346
383
def test_copy_suffixes(self):
347
384
from_store = self.get_populated_store()
348
to_store = CompressedTextStore(MemoryTransport(), True)
385
to_store = TextStore(MemoryTransport(),
386
prefixed=True, compressed=True)
349
387
to_store.register_suffix('sig')
350
copy_all(from_store, to_store)
388
to_store.copy_all_ids(from_store)
351
389
self.assertEqual(1, len(to_store))
352
390
self.assertEqual(set(['foo']), set(to_store.__iter__()))
353
391
self.assertEqual('content', to_store.get('foo').read())
357
395
def test_relpath_escaped(self):
358
396
my_store = store.TransportStore(MemoryTransport())
359
397
self.assertEqual('%25', my_store._relpath('%'))
399
def test_escaped_uppercase(self):
400
"""Uppercase letters are escaped for safety on Windows"""
401
my_store = store.TransportStore(MemoryTransport(), escaped=True)
402
# a particularly perverse file-id! :-)
403
self.assertEquals(my_store._escape_file_id('C:<>'), '%43%3a%3c%3e')
406
class TestVersionFileStore(TestCaseWithTransport):
409
super(TestVersionFileStore, self).setUp()
410
self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
412
def test_get_weave_registers_dirty_in_write(self):
413
transaction = transactions.WriteTransaction()
414
vf = self.vfstore.get_weave_or_empty('id', transaction)
416
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
417
transaction = transactions.WriteTransaction()
418
vf = self.vfstore.get_weave('id', transaction)
420
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
422
def test_get_weave_or_empty_readonly_fails(self):
423
transaction = transactions.ReadOnlyTransaction()
424
vf = self.assertRaises(errors.ReadOnlyError,
425
self.vfstore.get_weave_or_empty,
429
def test_get_weave_readonly_cant_write(self):
430
transaction = transactions.WriteTransaction()
431
vf = self.vfstore.get_weave_or_empty('id', transaction)
433
transaction = transactions.ReadOnlyTransaction()
434
vf = self.vfstore.get_weave_or_empty('id', transaction)
435
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])