1
# Copyright (C) 2005, 2007 Canonical Ltd
1
# Copyright (C) 2005 by Canonical Development Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Test Store implementations."""
23
import bzrlib.errors as errors
24
23
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
24
from bzrlib.store import copy_all
25
25
from bzrlib.transport.local import LocalTransport
26
26
from bzrlib.store.text import TextStore
27
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
27
from bzrlib.tests import TestCase, TestCaseInTempDir
28
28
import bzrlib.store as store
29
import bzrlib.store.versioned
30
import bzrlib.transactions as transactions
31
29
import bzrlib.transport as transport
32
30
from bzrlib.transport.memory import MemoryTransport
51
49
store_a = self.get_store('a')
52
store_a.add(StringIO('foo'), '1')
50
store_a.add('foo', '1')
54
52
store_b = self.get_store('b')
55
store_b.copy_all_ids(store_a)
53
copy_all(store_a, store_b)
56
54
self.assertEqual(store_a.get('1').read(), 'foo')
57
55
self.assertEqual(store_b.get('1').read(), 'foo')
58
56
# TODO: Switch the exception form UnlistableStore to
64
62
def test_get(self):
65
63
store = self.get_store()
66
64
self.fill_store(store)
68
66
self.check_content(store, 'a', 'hello')
69
67
self.check_content(store, 'b', 'other')
70
68
self.check_content(store, 'c', 'something')
72
70
# Make sure that requesting a non-existing file fails
73
71
self.assertRaises(KeyError, self.check_content, store, 'd', None)
92
90
store.add(StringIO('goodbye2'), '123123', 'dsc')
93
91
# these get gzipped - content should be stable
94
92
self.assertEqual(store.total_size(), (2, 55))
96
94
def test__relpath_suffixed(self):
97
95
my_store = TextStore(MockTransport(),
98
96
prefixed=True, compressed=True)
157
158
class TestMixedTextStore(TestCaseInTempDir, TestStores):
159
160
def get_store(self, path=u'.', compressed=True):
160
t = transport.get_transport(path)
161
t = LocalTransport(path)
161
162
return TextStore(t, compressed=compressed)
163
164
def test_get_mixed(self):
174
175
self.assertEquals(s.has_id('a'), True)
175
176
self.assertEquals(cs.get('a').read(), 'hello there')
176
177
self.assertEquals(s.get('a').read(), 'hello there')
178
179
self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
180
181
s.add(StringIO('goodbye'), 'b')
186
187
self.assertEquals(s.has_id('b'), True)
187
188
self.assertEquals(cs.get('b').read(), 'goodbye')
188
189
self.assertEquals(s.get('b').read(), 'goodbye')
190
191
self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
192
193
class MockTransport(transport.Transport):
297
298
my_store.register_suffix('dsc')
298
299
my_store.add(stream, "foo", 'dsc')
299
300
self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
301
302
def test_add_simple_suffixed(self):
302
303
stream = StringIO("content")
303
304
my_store = InstrumentedTransportStore(MockTransport(), True)
386
387
to_store = TextStore(MemoryTransport(),
387
388
prefixed=True, compressed=True)
388
389
to_store.register_suffix('sig')
389
to_store.copy_all_ids(from_store)
390
copy_all(from_store, to_store)
390
391
self.assertEqual(1, len(to_store))
391
392
self.assertEqual(set(['foo']), set(to_store.__iter__()))
392
393
self.assertEqual('content', to_store.get('foo').read())
396
397
def test_relpath_escaped(self):
397
398
my_store = store.TransportStore(MemoryTransport())
398
399
self.assertEqual('%25', my_store._relpath('%'))
400
def test_escaped_uppercase(self):
401
"""Uppercase letters are escaped for safety on Windows"""
402
my_store = store.TransportStore(MemoryTransport(), prefixed=True,
404
# a particularly perverse file-id! :-)
405
self.assertEquals(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e')
408
class TestVersionFileStore(TestCaseWithTransport):
411
return self._transaction
414
super(TestVersionFileStore, self).setUp()
415
self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
416
self.vfstore.get_scope = self.get_scope
417
self._transaction = None
419
def test_get_weave_registers_dirty_in_write(self):
420
self._transaction = transactions.WriteTransaction()
421
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
422
self._transaction.finish()
423
self._transaction = None
424
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
425
self._transaction = transactions.WriteTransaction()
426
vf = self.vfstore.get_weave('id', self._transaction)
427
self._transaction.finish()
428
self._transaction = None
429
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
431
def test_get_weave_readonly_cant_write(self):
432
self._transaction = transactions.WriteTransaction()
433
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
434
self._transaction.finish()
435
self._transaction = transactions.ReadOnlyTransaction()
436
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
437
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
439
def test___iter__escaped(self):
440
self.vfstore = store.versioned.VersionedFileStore(MemoryTransport(),
441
prefixed=True, escaped=True)
442
self.vfstore.get_scope = self.get_scope
443
self._transaction = transactions.WriteTransaction()
444
vf = self.vfstore.get_weave_or_empty(' ', self._transaction)
445
vf.add_lines('a', [], [])
447
self._transaction.finish()
448
self.assertEqual([' '], list(self.vfstore))