1
# Copyright (C) 2005 by Canonical Development Ltd
1
# Copyright (C) 2005, 2007 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""Test Store implementations."""
23
from bzrlib.errors import BzrError, UnlistableStore
24
from bzrlib.store import copy_all
23
import bzrlib.errors as errors
24
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
25
25
from bzrlib.transport.local import LocalTransport
26
from bzrlib.transport import NoSuchFile
27
26
from bzrlib.store.text import TextStore
28
from bzrlib.selftest import TestCase, TestCaseInTempDir
27
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
29
28
import bzrlib.store as store
29
import bzrlib.store.versioned
30
import bzrlib.transactions as transactions
30
31
import bzrlib.transport as transport
31
32
from bzrlib.transport.memory import MemoryTransport
50
51
store_a = self.get_store('a')
51
store_a.add('foo', '1')
52
store_a.add(StringIO('foo'), '1')
53
54
store_b = self.get_store('b')
54
copy_all(store_a, store_b)
55
store_b.copy_all_ids(store_a)
55
56
self.assertEqual(store_a.get('1').read(), 'foo')
56
57
self.assertEqual(store_b.get('1').read(), 'foo')
57
58
# TODO: Switch the exception form UnlistableStore to
63
64
def test_get(self):
64
65
store = self.get_store()
65
66
self.fill_store(store)
67
68
self.check_content(store, 'a', 'hello')
68
69
self.check_content(store, 'b', 'other')
69
70
self.check_content(store, 'c', 'something')
71
72
# Make sure that requesting a non-existing file fails
72
73
self.assertRaises(KeyError, self.check_content, store, 'd', None)
81
82
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
83
def get_store(self, path='.'):
84
t = LocalTransport(path)
84
def get_store(self, path=u'.'):
85
t = transport.get_transport(path)
85
86
return TextStore(t, compressed=True)
87
88
def test_total_size(self):
88
store = self.get_store('.')
89
store = self.get_store(u'.')
89
90
store.register_suffix('dsc')
90
91
store.add(StringIO('goodbye'), '123123')
91
92
store.add(StringIO('goodbye2'), '123123', 'dsc')
92
93
# these get gzipped - content should be stable
93
94
self.assertEqual(store.total_size(), (2, 55))
95
96
def test__relpath_suffixed(self):
96
my_store = TextStore(MockTransport(), True, compressed=True)
97
my_store = TextStore(MockTransport(),
98
prefixed=True, compressed=True)
97
99
my_store.register_suffix('dsc')
98
100
self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
101
103
class TestMemoryStore(TestCase):
103
105
def get_store(self):
104
return store.ImmutableMemoryStore()
106
def test_imports(self):
107
from bzrlib.store import ImmutableMemoryStore
106
return TextStore(MemoryTransport())
109
108
def test_add_and_retrieve(self):
110
109
store = self.get_store()
140
139
class TestTextStore(TestCaseInTempDir, TestStores):
142
def get_store(self, path='.'):
143
t = LocalTransport(path)
141
def get_store(self, path=u'.'):
142
t = transport.get_transport(path)
144
143
return TextStore(t, compressed=False)
146
145
def test_total_size(self):
158
157
class TestMixedTextStore(TestCaseInTempDir, TestStores):
160
def get_store(self, path='.', compressed=True):
161
t = LocalTransport(path)
159
def get_store(self, path=u'.', compressed=True):
160
t = transport.get_transport(path)
162
161
return TextStore(t, compressed=compressed)
164
163
def test_get_mixed(self):
165
cs = self.get_store('.', compressed=True)
166
s = self.get_store('.', compressed=False)
164
cs = self.get_store(u'.', compressed=True)
165
s = self.get_store(u'.', compressed=False)
167
166
cs.add(StringIO('hello there'), 'a')
169
168
self.failUnlessExists('a.gz')
175
174
self.assertEquals(s.has_id('a'), True)
176
175
self.assertEquals(cs.get('a').read(), 'hello there')
177
176
self.assertEquals(s.get('a').read(), 'hello there')
179
178
self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
181
180
s.add(StringIO('goodbye'), 'b')
187
186
self.assertEquals(s.has_id('b'), True)
188
187
self.assertEquals(cs.get('b').read(), 'goodbye')
189
188
self.assertEquals(s.get('b').read(), 'goodbye')
191
190
self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
193
192
class MockTransport(transport.Transport):
298
297
my_store.register_suffix('dsc')
299
298
my_store.add(stream, "foo", 'dsc')
300
299
self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
302
301
def test_add_simple_suffixed(self):
303
302
stream = StringIO("content")
304
303
my_store = InstrumentedTransportStore(MockTransport(), True)
358
357
my_store.get('missing', 'sig').read())
360
359
def test___iter__no_suffix(self):
361
my_store = TextStore(MemoryTransport(), False, compressed=False)
360
my_store = TextStore(MemoryTransport(),
361
prefixed=False, compressed=False)
362
362
stream = StringIO("content")
363
363
my_store.add(stream, "foo")
364
364
self.assertEqual(set(['foo']),
384
384
def test_copy_suffixes(self):
385
385
from_store = self.get_populated_store()
386
to_store = TextStore(MemoryTransport(), True, compressed=True)
386
to_store = TextStore(MemoryTransport(),
387
prefixed=True, compressed=True)
387
388
to_store.register_suffix('sig')
388
copy_all(from_store, to_store)
389
to_store.copy_all_ids(from_store)
389
390
self.assertEqual(1, len(to_store))
390
391
self.assertEqual(set(['foo']), set(to_store.__iter__()))
391
392
self.assertEqual('content', to_store.get('foo').read())
395
396
def test_relpath_escaped(self):
396
397
my_store = store.TransportStore(MemoryTransport())
397
398
self.assertEqual('%25', my_store._relpath('%'))
400
def test_escaped_uppercase(self):
401
"""Uppercase letters are escaped for safety on Windows"""
402
my_store = store.TransportStore(MemoryTransport(), prefixed=True,
404
# a particularly perverse file-id! :-)
405
self.assertEquals(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e')
408
class TestVersionFileStore(TestCaseWithTransport):
411
return self._transaction
414
super(TestVersionFileStore, self).setUp()
415
self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
416
self.vfstore.get_scope = self.get_scope
417
self._transaction = None
419
def test_get_weave_registers_dirty_in_write(self):
420
self._transaction = transactions.WriteTransaction()
421
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
422
self._transaction.finish()
423
self._transaction = None
424
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
425
self._transaction = transactions.WriteTransaction()
426
vf = self.vfstore.get_weave('id', self._transaction)
427
self._transaction.finish()
428
self._transaction = None
429
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
431
def test_get_weave_readonly_cant_write(self):
432
self._transaction = transactions.WriteTransaction()
433
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
434
self._transaction.finish()
435
self._transaction = transactions.ReadOnlyTransaction()
436
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
437
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
439
def test___iter__escaped(self):
440
self.vfstore = store.versioned.VersionedFileStore(MemoryTransport(),
441
prefixed=True, escaped=True)
442
self.vfstore.get_scope = self.get_scope
443
self._transaction = transactions.WriteTransaction()
444
vf = self.vfstore.get_weave_or_empty(' ', self._transaction)
445
vf.add_lines('a', [], [])
447
self._transaction.finish()
448
self.assertEqual([' '], list(self.vfstore))