19
19
from cStringIO import StringIO
23
import bzrlib.errors as errors
24
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
22
from bzrlib.errors import BzrError, UnlistableStore
23
from bzrlib.store import copy_all
25
24
from bzrlib.transport.local import LocalTransport
25
from bzrlib.transport import NoSuchFile
26
from bzrlib.store.compressed_text import CompressedTextStore
26
27
from bzrlib.store.text import TextStore
27
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
28
from bzrlib.selftest import TestCase, TestCaseInTempDir
28
29
import bzrlib.store as store
29
import bzrlib.transactions as transactions
30
30
import bzrlib.transport as transport
31
31
from bzrlib.transport.memory import MemoryTransport
51
51
store_a.add('foo', '1')
53
53
store_b = self.get_store('b')
54
store_b.copy_all_ids(store_a)
54
copy_all(store_a, store_b)
55
55
self.assertEqual(store_a.get('1').read(), 'foo')
56
56
self.assertEqual(store_b.get('1').read(), 'foo')
57
57
# TODO: Switch the exception form UnlistableStore to
81
81
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
83
def get_store(self, path=u'.'):
84
t = transport.get_transport(path)
85
return TextStore(t, compressed=True)
83
def get_store(self, path='.'):
84
t = LocalTransport(path)
85
return CompressedTextStore(t)
87
87
def test_total_size(self):
88
store = self.get_store(u'.')
88
store = self.get_store('.')
89
89
store.register_suffix('dsc')
90
90
store.add(StringIO('goodbye'), '123123')
91
91
store.add(StringIO('goodbye2'), '123123', 'dsc')
93
93
self.assertEqual(store.total_size(), (2, 55))
95
95
def test__relpath_suffixed(self):
96
my_store = TextStore(MockTransport(),
97
prefixed=True, compressed=True)
96
my_store = CompressedTextStore(MockTransport(), True)
98
97
my_store.register_suffix('dsc')
99
self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
98
self.assertEqual('45/foo.dsc.gz', my_store._relpath('foo', ['dsc']))
102
101
class TestMemoryStore(TestCase):
104
103
def get_store(self):
105
return TextStore(MemoryTransport())
104
return store.ImmutableMemoryStore()
106
def test_imports(self):
107
from bzrlib.store import ImmutableMemoryStore
107
109
def test_add_and_retrieve(self):
108
110
store = self.get_store()
109
111
store.add(StringIO('hello'), 'aa')
138
140
class TestTextStore(TestCaseInTempDir, TestStores):
140
def get_store(self, path=u'.'):
141
t = transport.get_transport(path)
142
return TextStore(t, compressed=False)
142
def get_store(self, path='.'):
143
t = LocalTransport(path)
144
146
def test_total_size(self):
145
147
store = self.get_store()
153
155
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
156
class TestMixedTextStore(TestCaseInTempDir, TestStores):
158
def get_store(self, path=u'.', compressed=True):
159
t = transport.get_transport(path)
160
return TextStore(t, compressed=compressed)
162
def test_get_mixed(self):
163
cs = self.get_store(u'.', compressed=True)
164
s = self.get_store(u'.', compressed=False)
165
cs.add(StringIO('hello there'), 'a')
167
self.failUnlessExists('a.gz')
168
self.failIf(os.path.lexists('a'))
170
self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
172
self.assertEquals(cs.has_id('a'), True)
173
self.assertEquals(s.has_id('a'), True)
174
self.assertEquals(cs.get('a').read(), 'hello there')
175
self.assertEquals(s.get('a').read(), 'hello there')
177
self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
179
s.add(StringIO('goodbye'), 'b')
180
self.failUnlessExists('b')
181
self.failIf(os.path.lexists('b.gz'))
182
self.assertEquals(open('b').read(), 'goodbye')
184
self.assertEquals(cs.has_id('b'), True)
185
self.assertEquals(s.has_id('b'), True)
186
self.assertEquals(cs.get('b').read(), 'goodbye')
187
self.assertEquals(s.get('b').read(), 'goodbye')
189
self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
191
158
class MockTransport(transport.Transport):
192
159
"""A fake transport for testing with."""
266
233
def test__relpath_simple_suffixed(self):
267
234
my_store = store.TransportStore(MockTransport())
235
my_store.register_suffix('gz')
268
236
my_store.register_suffix('bar')
269
my_store.register_suffix('baz')
270
self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
271
self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
237
self.assertEqual('foo.gz', my_store._relpath('foo', ['gz']))
238
self.assertEqual('foo.gz.bar', my_store._relpath('foo', ['gz', 'bar']))
273
240
def test__relpath_prefixed_suffixed(self):
274
241
my_store = store.TransportStore(MockTransport(), True)
242
my_store.register_suffix('gz')
275
243
my_store.register_suffix('bar')
276
my_store.register_suffix('baz')
277
self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
278
self.assertEqual('45/foo.bar.baz',
279
my_store._relpath('foo', ['bar', 'baz']))
244
self.assertEqual('45/foo.gz', my_store._relpath('foo', ['gz']))
245
self.assertEqual('45/foo.gz.bar',
246
my_store._relpath('foo', ['gz', 'bar']))
281
248
def test_add_simple(self):
282
249
stream = StringIO("content")
304
271
my_store.add(stream, "foo", 'dsc')
305
272
self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
307
def get_populated_store(self, prefixed=False,
308
store_class=TextStore, compressed=False):
309
my_store = store_class(MemoryTransport(), prefixed,
310
compressed=compressed)
274
def get_populated_store(self, prefixed=False, store_class=TextStore):
275
my_store = store_class(MemoryTransport(), prefixed)
311
276
my_store.register_suffix('sig')
312
277
stream = StringIO("signature")
313
278
my_store.add(stream, "foo", 'sig')
356
321
my_store.get('missing', 'sig').read())
358
323
def test___iter__no_suffix(self):
359
my_store = TextStore(MemoryTransport(),
360
prefixed=False, compressed=False)
324
my_store = TextStore(MemoryTransport(), False)
361
325
stream = StringIO("content")
362
326
my_store.add(stream, "foo")
363
327
self.assertEqual(set(['foo']),
372
336
def test___iter__compressed(self):
373
337
self.assertEqual(set(['foo']),
374
338
set(self.get_populated_store(
375
compressed=True).__iter__()))
339
store_class=CompressedTextStore).__iter__()))
376
340
self.assertEqual(set(['foo']),
377
341
set(self.get_populated_store(
378
True, compressed=True).__iter__()))
342
True, CompressedTextStore).__iter__()))
380
344
def test___len__(self):
381
345
self.assertEqual(1, len(self.get_populated_store()))
383
347
def test_copy_suffixes(self):
384
348
from_store = self.get_populated_store()
385
to_store = TextStore(MemoryTransport(),
386
prefixed=True, compressed=True)
349
to_store = CompressedTextStore(MemoryTransport(), True)
387
350
to_store.register_suffix('sig')
388
to_store.copy_all_ids(from_store)
351
copy_all(from_store, to_store)
389
352
self.assertEqual(1, len(to_store))
390
353
self.assertEqual(set(['foo']), set(to_store.__iter__()))
391
354
self.assertEqual('content', to_store.get('foo').read())
395
358
def test_relpath_escaped(self):
396
359
my_store = store.TransportStore(MemoryTransport())
397
360
self.assertEqual('%25', my_store._relpath('%'))
399
def test_escaped_uppercase(self):
400
"""Uppercase letters are escaped for safety on Windows"""
401
my_store = store.TransportStore(MemoryTransport(), escaped=True)
402
# a particularly perverse file-id! :-)
403
self.assertEquals(my_store._escape_file_id('C:<>'), '%43%3a%3c%3e')
406
class TestVersionFileStore(TestCaseWithTransport):
409
super(TestVersionFileStore, self).setUp()
410
self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
412
def test_get_weave_registers_dirty_in_write(self):
413
transaction = transactions.WriteTransaction()
414
vf = self.vfstore.get_weave_or_empty('id', transaction)
416
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
417
transaction = transactions.WriteTransaction()
418
vf = self.vfstore.get_weave('id', transaction)
420
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
422
def test_get_weave_or_empty_readonly_fails(self):
423
transaction = transactions.ReadOnlyTransaction()
424
vf = self.assertRaises(errors.ReadOnlyError,
425
self.vfstore.get_weave_or_empty,
429
def test_get_weave_readonly_cant_write(self):
430
transaction = transactions.WriteTransaction()
431
vf = self.vfstore.get_weave_or_empty('id', transaction)
433
transaction = transactions.ReadOnlyTransaction()
434
vf = self.vfstore.get_weave_or_empty('id', transaction)
435
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])