1
# Copyright (C) 2005 by Canonical Development Ltd
1
# Copyright (C) 2005-2009, 2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""Test Store implementations."""
23
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
24
from bzrlib.store import copy_all
25
from bzrlib.transport.local import LocalTransport
23
import bzrlib.errors as errors
24
from bzrlib.errors import BzrError
25
from bzrlib.store import TransportStore
26
26
from bzrlib.store.text import TextStore
27
from bzrlib.tests import TestCase, TestCaseInTempDir
28
import bzrlib.store as store
27
from bzrlib.store.versioned import VersionedFileStore
28
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
29
import bzrlib.transactions as transactions
29
30
import bzrlib.transport as transport
30
31
from bzrlib.transport.memory import MemoryTransport
32
from bzrlib.weave import WeaveFile
33
35
class TestStores(object):
49
51
store_a = self.get_store('a')
50
store_a.add('foo', '1')
52
store_a.add(StringIO('foo'), '1')
52
54
store_b = self.get_store('b')
53
copy_all(store_a, store_b)
55
store_b.copy_all_ids(store_a)
54
56
self.assertEqual(store_a.get('1').read(), 'foo')
55
57
self.assertEqual(store_b.get('1').read(), 'foo')
56
58
# TODO: Switch the exception form UnlistableStore to
62
64
def test_get(self):
63
65
store = self.get_store()
64
66
self.fill_store(store)
66
68
self.check_content(store, 'a', 'hello')
67
69
self.check_content(store, 'b', 'other')
68
70
self.check_content(store, 'c', 'something')
70
72
# Make sure that requesting a non-existing file fails
71
73
self.assertRaises(KeyError, self.check_content, store, 'd', None)
90
92
store.add(StringIO('goodbye2'), '123123', 'dsc')
91
93
# these get gzipped - content should be stable
92
94
self.assertEqual(store.total_size(), (2, 55))
94
96
def test__relpath_suffixed(self):
95
97
my_store = TextStore(MockTransport(),
96
98
prefixed=True, compressed=True)
166
165
s = self.get_store(u'.', compressed=False)
167
166
cs.add(StringIO('hello there'), 'a')
169
self.failUnlessExists('a.gz')
170
self.failIf(os.path.lexists('a'))
168
self.assertPathExists('a.gz')
169
self.assertFalse(os.path.lexists('a'))
172
171
self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
175
174
self.assertEquals(s.has_id('a'), True)
176
175
self.assertEquals(cs.get('a').read(), 'hello there')
177
176
self.assertEquals(s.get('a').read(), 'hello there')
179
178
self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
181
180
s.add(StringIO('goodbye'), 'b')
182
self.failUnlessExists('b')
183
self.failIf(os.path.lexists('b.gz'))
181
self.assertPathExists('b')
182
self.assertFalse(os.path.lexists('b.gz'))
184
183
self.assertEquals(open('b').read(), 'goodbye')
186
185
self.assertEquals(cs.has_id('b'), True)
187
186
self.assertEquals(s.has_id('b'), True)
188
187
self.assertEquals(cs.get('b').read(), 'goodbye')
189
188
self.assertEquals(s.get('b').read(), 'goodbye')
191
190
self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
193
192
class MockTransport(transport.Transport):
243
242
class TestTransportStore(TestCase):
245
244
def test__relpath_invalid(self):
246
my_store = store.TransportStore(MockTransport())
245
my_store = TransportStore(MockTransport())
247
246
self.assertRaises(ValueError, my_store._relpath, '/foo')
248
247
self.assertRaises(ValueError, my_store._relpath, 'foo/')
250
249
def test_register_invalid_suffixes(self):
251
my_store = store.TransportStore(MockTransport())
250
my_store = TransportStore(MockTransport())
252
251
self.assertRaises(ValueError, my_store.register_suffix, '/')
253
252
self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
255
254
def test__relpath_unregister_suffixes(self):
256
my_store = store.TransportStore(MockTransport())
255
my_store = TransportStore(MockTransport())
257
256
self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
258
257
self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
260
259
def test__relpath_simple(self):
261
my_store = store.TransportStore(MockTransport())
260
my_store = TransportStore(MockTransport())
262
261
self.assertEqual("foo", my_store._relpath('foo'))
264
263
def test__relpath_prefixed(self):
265
my_store = store.TransportStore(MockTransport(), True)
264
my_store = TransportStore(MockTransport(), True)
266
265
self.assertEqual('45/foo', my_store._relpath('foo'))
268
267
def test__relpath_simple_suffixed(self):
269
my_store = store.TransportStore(MockTransport())
268
my_store = TransportStore(MockTransport())
270
269
my_store.register_suffix('bar')
271
270
my_store.register_suffix('baz')
272
271
self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
273
272
self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
275
274
def test__relpath_prefixed_suffixed(self):
276
my_store = store.TransportStore(MockTransport(), True)
275
my_store = TransportStore(MockTransport(), True)
277
276
my_store.register_suffix('bar')
278
277
my_store.register_suffix('baz')
279
278
self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
298
297
my_store.register_suffix('dsc')
299
298
my_store.add(stream, "foo", 'dsc')
300
299
self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
302
301
def test_add_simple_suffixed(self):
303
302
stream = StringIO("content")
304
303
my_store = InstrumentedTransportStore(MockTransport(), True)
387
386
to_store = TextStore(MemoryTransport(),
388
387
prefixed=True, compressed=True)
389
388
to_store.register_suffix('sig')
390
copy_all(from_store, to_store)
389
to_store.copy_all_ids(from_store)
391
390
self.assertEqual(1, len(to_store))
392
391
self.assertEqual(set(['foo']), set(to_store.__iter__()))
393
392
self.assertEqual('content', to_store.get('foo').read())
395
394
self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
397
396
def test_relpath_escaped(self):
398
my_store = store.TransportStore(MemoryTransport())
397
my_store = TransportStore(MemoryTransport())
399
398
self.assertEqual('%25', my_store._relpath('%'))
400
def test_escaped_uppercase(self):
401
"""Uppercase letters are escaped for safety on Windows"""
402
my_store = TransportStore(MemoryTransport(), prefixed=True,
404
# a particularly perverse file-id! :-)
405
self.assertEquals(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e')
408
class TestVersionFileStore(TestCaseWithTransport):
411
return self._transaction
414
super(TestVersionFileStore, self).setUp()
415
self.vfstore = VersionedFileStore(MemoryTransport(),
416
versionedfile_class=WeaveFile)
417
self.vfstore.get_scope = self.get_scope
418
self._transaction = None
420
def test_get_weave_registers_dirty_in_write(self):
421
self._transaction = transactions.WriteTransaction()
422
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
423
self._transaction.finish()
424
self._transaction = None
425
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
426
self._transaction = transactions.WriteTransaction()
427
vf = self.vfstore.get_weave('id', self._transaction)
428
self._transaction.finish()
429
self._transaction = None
430
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
432
def test_get_weave_readonly_cant_write(self):
433
self._transaction = transactions.WriteTransaction()
434
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
435
self._transaction.finish()
436
self._transaction = transactions.ReadOnlyTransaction()
437
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
438
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
440
def test___iter__escaped(self):
441
self.vfstore = VersionedFileStore(MemoryTransport(),
442
prefixed=True, escaped=True, versionedfile_class=WeaveFile)
443
self.vfstore.get_scope = self.get_scope
444
self._transaction = transactions.WriteTransaction()
445
vf = self.vfstore.get_weave_or_empty(' ', self._transaction)
446
vf.add_lines('a', [], [])
448
self._transaction.finish()
449
self.assertEqual([' '], list(self.vfstore))