1
# Copyright (C) 2005 by Canonical Development Ltd
1
# Copyright (C) 2005-2011, 2016 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""Test Store implementations."""
19
19
from cStringIO import StringIO
22
from bzrlib.errors import BzrError, UnlistableStore
23
from bzrlib.store import copy_all
24
from bzrlib.transport.local import LocalTransport
25
from bzrlib.transport import NoSuchFile
26
from bzrlib.store.compressed_text import CompressedTextStore
23
import bzrlib.errors as errors
24
from bzrlib.errors import BzrError
25
from bzrlib.store import TransportStore
27
26
from bzrlib.store.text import TextStore
28
from bzrlib.selftest import TestCase, TestCaseInTempDir
29
import bzrlib.store as store
27
from bzrlib.store.versioned import VersionedFileStore
28
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
29
import bzrlib.transactions as transactions
30
30
import bzrlib.transport as transport
31
31
from bzrlib.transport.memory import MemoryTransport
32
from bzrlib.weave import WeaveFile
34
35
class TestStores(object):
36
"""Mixin template class that provides some common tests for stores"""
36
38
def check_content(self, store, fileid, value):
37
39
f = store.get(fileid)
80
82
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
82
def get_store(self, path='.'):
83
t = LocalTransport(path)
84
return CompressedTextStore(t)
84
def get_store(self, path=u'.'):
85
t = transport.get_transport_from_path(path)
86
return TextStore(t, compressed=True)
86
88
def test_total_size(self):
87
store = self.get_store('.')
89
store = self.get_store(u'.')
88
90
store.register_suffix('dsc')
89
91
store.add(StringIO('goodbye'), '123123')
90
92
store.add(StringIO('goodbye2'), '123123', 'dsc')
91
93
# these get gzipped - content should be stable
92
94
self.assertEqual(store.total_size(), (2, 55))
94
96
def test__relpath_suffixed(self):
95
my_store = CompressedTextStore(MockTransport(), True)
97
my_store = TextStore(MockTransport(),
98
prefixed=True, compressed=True)
96
99
my_store.register_suffix('dsc')
97
self.assertEqual('45/foo.dsc.gz', my_store._relpath('foo', ['dsc']))
100
self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
100
103
class TestMemoryStore(TestCase):
102
105
def get_store(self):
103
return store.ImmutableMemoryStore()
105
def test_imports(self):
106
from bzrlib.store import ImmutableMemoryStore
106
return TextStore(MemoryTransport())
108
108
def test_add_and_retrieve(self):
109
109
store = self.get_store()
154
154
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
157
class TestMixedTextStore(TestCaseInTempDir, TestStores):
159
def get_store(self, path=u'.', compressed=True):
160
t = transport.get_transport_from_path(path)
161
return TextStore(t, compressed=compressed)
163
def test_get_mixed(self):
164
cs = self.get_store(u'.', compressed=True)
165
s = self.get_store(u'.', compressed=False)
166
cs.add(StringIO('hello there'), 'a')
168
self.assertPathExists('a.gz')
169
self.assertFalse(os.path.lexists('a'))
171
self.assertEqual(gzip.GzipFile('a.gz').read(), 'hello there')
173
self.assertEqual(cs.has_id('a'), True)
174
self.assertEqual(s.has_id('a'), True)
175
self.assertEqual(cs.get('a').read(), 'hello there')
176
self.assertEqual(s.get('a').read(), 'hello there')
178
self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
180
s.add(StringIO('goodbye'), 'b')
181
self.assertPathExists('b')
182
self.assertFalse(os.path.lexists('b.gz'))
183
self.assertEqual(open('b').read(), 'goodbye')
185
self.assertEqual(cs.has_id('b'), True)
186
self.assertEqual(s.has_id('b'), True)
187
self.assertEqual(cs.get('b').read(), 'goodbye')
188
self.assertEqual(s.get('b').read(), 'goodbye')
190
self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
157
192
class MockTransport(transport.Transport):
158
193
"""A fake transport for testing with."""
207
242
class TestTransportStore(TestCase):
209
244
def test__relpath_invalid(self):
210
my_store = store.TransportStore(MockTransport())
245
my_store = TransportStore(MockTransport())
211
246
self.assertRaises(ValueError, my_store._relpath, '/foo')
212
247
self.assertRaises(ValueError, my_store._relpath, 'foo/')
214
249
def test_register_invalid_suffixes(self):
215
my_store = store.TransportStore(MockTransport())
250
my_store = TransportStore(MockTransport())
216
251
self.assertRaises(ValueError, my_store.register_suffix, '/')
217
252
self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
219
254
def test__relpath_unregister_suffixes(self):
220
my_store = store.TransportStore(MockTransport())
255
my_store = TransportStore(MockTransport())
221
256
self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
222
257
self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
224
259
def test__relpath_simple(self):
225
my_store = store.TransportStore(MockTransport())
260
my_store = TransportStore(MockTransport())
226
261
self.assertEqual("foo", my_store._relpath('foo'))
228
263
def test__relpath_prefixed(self):
229
my_store = store.TransportStore(MockTransport(), True)
264
my_store = TransportStore(MockTransport(), True)
230
265
self.assertEqual('45/foo', my_store._relpath('foo'))
232
267
def test__relpath_simple_suffixed(self):
233
my_store = store.TransportStore(MockTransport())
234
my_store.register_suffix('gz')
268
my_store = TransportStore(MockTransport())
235
269
my_store.register_suffix('bar')
236
self.assertEqual('foo.gz', my_store._relpath('foo', ['gz']))
237
self.assertEqual('foo.gz.bar', my_store._relpath('foo', ['gz', 'bar']))
270
my_store.register_suffix('baz')
271
self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
272
self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
239
274
def test__relpath_prefixed_suffixed(self):
240
my_store = store.TransportStore(MockTransport(), True)
241
my_store.register_suffix('gz')
275
my_store = TransportStore(MockTransport(), True)
242
276
my_store.register_suffix('bar')
243
self.assertEqual('45/foo.gz', my_store._relpath('foo', ['gz']))
244
self.assertEqual('45/foo.gz.bar',
245
my_store._relpath('foo', ['gz', 'bar']))
277
my_store.register_suffix('baz')
278
self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
279
self.assertEqual('45/foo.bar.baz',
280
my_store._relpath('foo', ['bar', 'baz']))
247
282
def test_add_simple(self):
248
283
stream = StringIO("content")
270
305
my_store.add(stream, "foo", 'dsc')
271
306
self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
273
def get_populated_store(self, prefixed=False, store_class=TextStore):
274
my_store = store_class(MemoryTransport(), prefixed)
308
def get_populated_store(self, prefixed=False,
309
store_class=TextStore, compressed=False):
310
my_store = store_class(MemoryTransport(), prefixed,
311
compressed=compressed)
275
312
my_store.register_suffix('sig')
276
313
stream = StringIO("signature")
277
314
my_store.add(stream, "foo", 'sig')
335
373
def test___iter__compressed(self):
336
374
self.assertEqual(set(['foo']),
337
375
set(self.get_populated_store(
338
store_class=CompressedTextStore).__iter__()))
376
compressed=True).__iter__()))
339
377
self.assertEqual(set(['foo']),
340
378
set(self.get_populated_store(
341
True, CompressedTextStore).__iter__()))
379
True, compressed=True).__iter__()))
343
381
def test___len__(self):
344
382
self.assertEqual(1, len(self.get_populated_store()))
346
384
def test_copy_suffixes(self):
347
385
from_store = self.get_populated_store()
348
to_store = CompressedTextStore(MemoryTransport(), True)
386
to_store = TextStore(MemoryTransport(),
387
prefixed=True, compressed=True)
349
388
to_store.register_suffix('sig')
350
copy_all(from_store, to_store)
389
to_store.copy_all_ids(from_store)
351
390
self.assertEqual(1, len(to_store))
352
391
self.assertEqual(set(['foo']), set(to_store.__iter__()))
353
392
self.assertEqual('content', to_store.get('foo').read())
354
393
self.assertEqual('signature', to_store.get('foo', 'sig').read())
355
394
self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
396
def test_relpath_escaped(self):
397
my_store = TransportStore(MemoryTransport())
398
self.assertEqual('%25', my_store._relpath('%'))
400
def test_escaped_uppercase(self):
401
"""Uppercase letters are escaped for safety on Windows"""
402
my_store = TransportStore(MemoryTransport(), prefixed=True,
404
# a particularly perverse file-id! :-)
405
self.assertEqual(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e')
408
class TestVersionFileStore(TestCaseWithTransport):
411
return self._transaction
414
super(TestVersionFileStore, self).setUp()
415
self.vfstore = VersionedFileStore(MemoryTransport(),
416
versionedfile_class=WeaveFile)
417
self.vfstore.get_scope = self.get_scope
418
self._transaction = None
420
def test_get_weave_registers_dirty_in_write(self):
421
self._transaction = transactions.WriteTransaction()
422
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
423
self._transaction.finish()
424
self._transaction = None
425
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
426
self._transaction = transactions.WriteTransaction()
427
vf = self.vfstore.get_weave('id', self._transaction)
428
self._transaction.finish()
429
self._transaction = None
430
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
432
def test_get_weave_readonly_cant_write(self):
433
self._transaction = transactions.WriteTransaction()
434
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
435
self._transaction.finish()
436
self._transaction = transactions.ReadOnlyTransaction()
437
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
438
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
440
def test___iter__escaped(self):
441
self.vfstore = VersionedFileStore(MemoryTransport(),
442
prefixed=True, escaped=True, versionedfile_class=WeaveFile)
443
self.vfstore.get_scope = self.get_scope
444
self._transaction = transactions.WriteTransaction()
445
vf = self.vfstore.get_weave_or_empty(' ', self._transaction)
446
vf.add_lines('a', [], [])
448
self._transaction.finish()
449
self.assertEqual([' '], list(self.vfstore))