1
# Copyright (C) 2005 by Canonical Development Ltd
1
# Copyright (C) 2005-2009, 2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""Test Store implementations."""
23
23
import bzrlib.errors as errors
24
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
25
from bzrlib.transport.local import LocalTransport
24
from bzrlib.errors import BzrError
25
from bzrlib.store import TransportStore
26
26
from bzrlib.store.text import TextStore
27
from bzrlib.store.versioned import VersionedFileStore
27
28
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
28
import bzrlib.store as store
29
29
import bzrlib.transactions as transactions
30
30
import bzrlib.transport as transport
31
31
from bzrlib.transport.memory import MemoryTransport
32
from bzrlib.weave import WeaveFile
34
35
class TestStores(object):
176
174
self.assertEquals(s.has_id('a'), True)
177
175
self.assertEquals(cs.get('a').read(), 'hello there')
178
176
self.assertEquals(s.get('a').read(), 'hello there')
180
178
self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
182
180
s.add(StringIO('goodbye'), 'b')
183
self.failUnlessExists('b')
184
self.failIf(os.path.lexists('b.gz'))
181
self.assertPathExists('b')
182
self.assertFalse(os.path.lexists('b.gz'))
185
183
self.assertEquals(open('b').read(), 'goodbye')
187
185
self.assertEquals(cs.has_id('b'), True)
188
186
self.assertEquals(s.has_id('b'), True)
189
187
self.assertEquals(cs.get('b').read(), 'goodbye')
190
188
self.assertEquals(s.get('b').read(), 'goodbye')
192
190
self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
194
192
class MockTransport(transport.Transport):
244
242
class TestTransportStore(TestCase):
246
244
def test__relpath_invalid(self):
247
my_store = store.TransportStore(MockTransport())
245
my_store = TransportStore(MockTransport())
248
246
self.assertRaises(ValueError, my_store._relpath, '/foo')
249
247
self.assertRaises(ValueError, my_store._relpath, 'foo/')
251
249
def test_register_invalid_suffixes(self):
252
my_store = store.TransportStore(MockTransport())
250
my_store = TransportStore(MockTransport())
253
251
self.assertRaises(ValueError, my_store.register_suffix, '/')
254
252
self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
256
254
def test__relpath_unregister_suffixes(self):
257
my_store = store.TransportStore(MockTransport())
255
my_store = TransportStore(MockTransport())
258
256
self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
259
257
self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
261
259
def test__relpath_simple(self):
262
my_store = store.TransportStore(MockTransport())
260
my_store = TransportStore(MockTransport())
263
261
self.assertEqual("foo", my_store._relpath('foo'))
265
263
def test__relpath_prefixed(self):
266
my_store = store.TransportStore(MockTransport(), True)
264
my_store = TransportStore(MockTransport(), True)
267
265
self.assertEqual('45/foo', my_store._relpath('foo'))
269
267
def test__relpath_simple_suffixed(self):
270
my_store = store.TransportStore(MockTransport())
268
my_store = TransportStore(MockTransport())
271
269
my_store.register_suffix('bar')
272
270
my_store.register_suffix('baz')
273
271
self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
274
272
self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
276
274
def test__relpath_prefixed_suffixed(self):
277
my_store = store.TransportStore(MockTransport(), True)
275
my_store = TransportStore(MockTransport(), True)
278
276
my_store.register_suffix('bar')
279
277
my_store.register_suffix('baz')
280
278
self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
396
394
self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
398
396
def test_relpath_escaped(self):
399
my_store = store.TransportStore(MemoryTransport())
397
my_store = TransportStore(MemoryTransport())
400
398
self.assertEqual('%25', my_store._relpath('%'))
400
def test_escaped_uppercase(self):
401
"""Uppercase letters are escaped for safety on Windows"""
402
my_store = TransportStore(MemoryTransport(), prefixed=True,
404
# a particularly perverse file-id! :-)
405
self.assertEquals(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e')
403
408
class TestVersionFileStore(TestCaseWithTransport):
411
return self._transaction
406
414
super(TestVersionFileStore, self).setUp()
407
self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
415
self.vfstore = VersionedFileStore(MemoryTransport(),
416
versionedfile_class=WeaveFile)
417
self.vfstore.get_scope = self.get_scope
418
self._transaction = None
409
420
def test_get_weave_registers_dirty_in_write(self):
410
transaction = transactions.WriteTransaction()
411
vf = self.vfstore.get_weave_or_empty('id', transaction)
413
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
414
transaction = transactions.WriteTransaction()
415
vf = self.vfstore.get_weave('id', transaction)
417
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
419
def test_get_weave_or_empty_readonly_fails(self):
420
transaction = transactions.ReadOnlyTransaction()
421
vf = self.assertRaises(errors.ReadOnlyError,
422
self.vfstore.get_weave_or_empty,
421
self._transaction = transactions.WriteTransaction()
422
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
423
self._transaction.finish()
424
self._transaction = None
425
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
426
self._transaction = transactions.WriteTransaction()
427
vf = self.vfstore.get_weave('id', self._transaction)
428
self._transaction.finish()
429
self._transaction = None
430
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
426
432
def test_get_weave_readonly_cant_write(self):
427
transaction = transactions.WriteTransaction()
428
vf = self.vfstore.get_weave_or_empty('id', transaction)
430
transaction = transactions.ReadOnlyTransaction()
431
vf = self.vfstore.get_weave_or_empty('id', transaction)
433
self._transaction = transactions.WriteTransaction()
434
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
435
self._transaction.finish()
436
self._transaction = transactions.ReadOnlyTransaction()
437
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
432
438
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
440
def test___iter__escaped(self):
441
self.vfstore = VersionedFileStore(MemoryTransport(),
442
prefixed=True, escaped=True, versionedfile_class=WeaveFile)
443
self.vfstore.get_scope = self.get_scope
444
self._transaction = transactions.WriteTransaction()
445
vf = self.vfstore.get_weave_or_empty(' ', self._transaction)
446
vf.add_lines('a', [], [])
448
self._transaction.finish()
449
self.assertEqual([' '], list(self.vfstore))