13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Test Store implementations."""
23
23
import bzrlib.errors as errors
24
from bzrlib.errors import BzrError
25
from bzrlib.store import TransportStore
24
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
25
from bzrlib.transport.local import LocalTransport
26
26
from bzrlib.store.text import TextStore
27
from bzrlib.store.versioned import VersionedFileStore
28
27
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
28
import bzrlib.store as store
29
29
import bzrlib.transactions as transactions
30
30
import bzrlib.transport as transport
31
31
from bzrlib.transport.memory import MemoryTransport
32
from bzrlib.weave import WeaveFile
35
34
class TestStores(object):
64
63
def test_get(self):
65
64
store = self.get_store()
66
65
self.fill_store(store)
68
67
self.check_content(store, 'a', 'hello')
69
68
self.check_content(store, 'b', 'other')
70
69
self.check_content(store, 'c', 'something')
72
71
# Make sure that requesting a non-existing file fails
73
72
self.assertRaises(KeyError, self.check_content, store, 'd', None)
82
81
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
84
83
def get_store(self, path=u'.'):
85
t = transport.get_transport_from_path(path)
84
t = transport.get_transport(path)
86
85
return TextStore(t, compressed=True)
88
87
def test_total_size(self):
92
91
store.add(StringIO('goodbye2'), '123123', 'dsc')
93
92
# these get gzipped - content should be stable
94
93
self.assertEqual(store.total_size(), (2, 55))
96
95
def test__relpath_suffixed(self):
97
96
my_store = TextStore(MockTransport(),
98
97
prefixed=True, compressed=True)
139
138
class TestTextStore(TestCaseInTempDir, TestStores):
141
140
def get_store(self, path=u'.'):
142
t = transport.get_transport_from_path(path)
141
t = transport.get_transport(path)
143
142
return TextStore(t, compressed=False)
145
144
def test_total_size(self):
157
156
class TestMixedTextStore(TestCaseInTempDir, TestStores):
159
158
def get_store(self, path=u'.', compressed=True):
160
t = transport.get_transport_from_path(path)
159
t = transport.get_transport(path)
161
160
return TextStore(t, compressed=compressed)
163
162
def test_get_mixed(self):
165
164
s = self.get_store(u'.', compressed=False)
166
165
cs.add(StringIO('hello there'), 'a')
168
self.assertPathExists('a.gz')
169
self.assertFalse(os.path.lexists('a'))
171
self.assertEqual(gzip.GzipFile('a.gz').read(), 'hello there')
173
self.assertEqual(cs.has_id('a'), True)
174
self.assertEqual(s.has_id('a'), True)
175
self.assertEqual(cs.get('a').read(), 'hello there')
176
self.assertEqual(s.get('a').read(), 'hello there')
167
self.failUnlessExists('a.gz')
168
self.failIf(os.path.lexists('a'))
170
self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
172
self.assertEquals(cs.has_id('a'), True)
173
self.assertEquals(s.has_id('a'), True)
174
self.assertEquals(cs.get('a').read(), 'hello there')
175
self.assertEquals(s.get('a').read(), 'hello there')
178
177
self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
180
179
s.add(StringIO('goodbye'), 'b')
181
self.assertPathExists('b')
182
self.assertFalse(os.path.lexists('b.gz'))
183
self.assertEqual(open('b').read(), 'goodbye')
185
self.assertEqual(cs.has_id('b'), True)
186
self.assertEqual(s.has_id('b'), True)
187
self.assertEqual(cs.get('b').read(), 'goodbye')
188
self.assertEqual(s.get('b').read(), 'goodbye')
180
self.failUnlessExists('b')
181
self.failIf(os.path.lexists('b.gz'))
182
self.assertEquals(open('b').read(), 'goodbye')
184
self.assertEquals(cs.has_id('b'), True)
185
self.assertEquals(s.has_id('b'), True)
186
self.assertEquals(cs.get('b').read(), 'goodbye')
187
self.assertEquals(s.get('b').read(), 'goodbye')
190
189
self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
192
191
class MockTransport(transport.Transport):
242
241
class TestTransportStore(TestCase):
244
243
def test__relpath_invalid(self):
245
my_store = TransportStore(MockTransport())
244
my_store = store.TransportStore(MockTransport())
246
245
self.assertRaises(ValueError, my_store._relpath, '/foo')
247
246
self.assertRaises(ValueError, my_store._relpath, 'foo/')
249
248
def test_register_invalid_suffixes(self):
250
my_store = TransportStore(MockTransport())
249
my_store = store.TransportStore(MockTransport())
251
250
self.assertRaises(ValueError, my_store.register_suffix, '/')
252
251
self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
254
253
def test__relpath_unregister_suffixes(self):
255
my_store = TransportStore(MockTransport())
254
my_store = store.TransportStore(MockTransport())
256
255
self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
257
256
self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
259
258
def test__relpath_simple(self):
260
my_store = TransportStore(MockTransport())
259
my_store = store.TransportStore(MockTransport())
261
260
self.assertEqual("foo", my_store._relpath('foo'))
263
262
def test__relpath_prefixed(self):
264
my_store = TransportStore(MockTransport(), True)
263
my_store = store.TransportStore(MockTransport(), True)
265
264
self.assertEqual('45/foo', my_store._relpath('foo'))
267
266
def test__relpath_simple_suffixed(self):
268
my_store = TransportStore(MockTransport())
267
my_store = store.TransportStore(MockTransport())
269
268
my_store.register_suffix('bar')
270
269
my_store.register_suffix('baz')
271
270
self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
272
271
self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
274
273
def test__relpath_prefixed_suffixed(self):
275
my_store = TransportStore(MockTransport(), True)
274
my_store = store.TransportStore(MockTransport(), True)
276
275
my_store.register_suffix('bar')
277
276
my_store.register_suffix('baz')
278
277
self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
297
296
my_store.register_suffix('dsc')
298
297
my_store.add(stream, "foo", 'dsc')
299
298
self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
301
300
def test_add_simple_suffixed(self):
302
301
stream = StringIO("content")
303
302
my_store = InstrumentedTransportStore(MockTransport(), True)
394
393
self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
396
395
def test_relpath_escaped(self):
397
my_store = TransportStore(MemoryTransport())
396
my_store = store.TransportStore(MemoryTransport())
398
397
self.assertEqual('%25', my_store._relpath('%'))
400
399
def test_escaped_uppercase(self):
401
400
"""Uppercase letters are escaped for safety on Windows"""
402
my_store = TransportStore(MemoryTransport(), prefixed=True,
401
my_store = store.TransportStore(MemoryTransport(), escaped=True)
404
402
# a particularly perverse file-id! :-)
405
self.assertEqual(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e')
403
self.assertEquals(my_store._escape_file_id('C:<>'), '%43%3a%3c%3e')
408
406
class TestVersionFileStore(TestCaseWithTransport):
436
433
self._transaction = transactions.ReadOnlyTransaction()
437
434
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
438
435
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
440
def test___iter__escaped(self):
441
self.vfstore = VersionedFileStore(MemoryTransport(),
442
prefixed=True, escaped=True, versionedfile_class=WeaveFile)
443
self.vfstore.get_scope = self.get_scope
444
self._transaction = transactions.WriteTransaction()
445
vf = self.vfstore.get_weave_or_empty(' ', self._transaction)
446
vf.add_lines('a', [], [])
448
self._transaction.finish()
449
self.assertEqual([' '], list(self.vfstore))