1
# Copyright (C) 2005 by Canonical Development Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Test Store implementations."""
19
from cStringIO import StringIO
22
from bzrlib.errors import BzrError, UnlistableStore
23
from bzrlib.store import copy_all
24
from bzrlib.transport.local import LocalTransport
25
from bzrlib.transport import NoSuchFile
26
from bzrlib.store.compressed_text import CompressedTextStore
27
from bzrlib.store.text import TextStore
28
from bzrlib.selftest import TestCase, TestCaseInTempDir
29
import bzrlib.store as store
30
import bzrlib.transport as transport
31
from bzrlib.transport.memory import MemoryTransport
34
class TestStores(object):
35
"""Mixin template class that provides some common tests for stores"""
37
def check_content(self, store, fileid, value):
39
self.assertEqual(f.read(), value)
41
def fill_store(self, store):
42
store.add(StringIO('hello'), 'a')
43
store.add(StringIO('other'), 'b')
44
store.add(StringIO('something'), 'c')
45
store.add(StringIO('goodbye'), '123123')
47
def test_copy_all(self):
50
store_a = self.get_store('a')
51
store_a.add('foo', '1')
53
store_b = self.get_store('b')
54
copy_all(store_a, store_b)
55
self.assertEqual(store_a.get('1').read(), 'foo')
56
self.assertEqual(store_b.get('1').read(), 'foo')
57
# TODO: Switch the exception form UnlistableStore to
58
# or make Stores throw UnlistableStore if their
59
# Transport doesn't support listing
60
# store_c = RemoteStore('http://example.com/')
61
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
64
store = self.get_store()
65
self.fill_store(store)
67
self.check_content(store, 'a', 'hello')
68
self.check_content(store, 'b', 'other')
69
self.check_content(store, 'c', 'something')
71
# Make sure that requesting a non-existing file fails
72
self.assertRaises(KeyError, self.check_content, store, 'd', None)
74
def test_multiple_add(self):
75
"""Multiple add with same ID should raise a BzrError"""
76
store = self.get_store()
77
self.fill_store(store)
78
self.assertRaises(BzrError, store.add, StringIO('goodbye'), '123123')
81
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
83
def get_store(self, path='.'):
84
t = LocalTransport(path)
85
return CompressedTextStore(t)
87
def test_total_size(self):
88
store = self.get_store('.')
89
store.register_suffix('dsc')
90
store.add(StringIO('goodbye'), '123123')
91
store.add(StringIO('goodbye2'), '123123', 'dsc')
92
# these get gzipped - content should be stable
93
self.assertEqual(store.total_size(), (2, 55))
95
def test__relpath_suffixed(self):
96
my_store = CompressedTextStore(MockTransport(), True)
97
my_store.register_suffix('dsc')
98
self.assertEqual('45/foo.dsc.gz', my_store._relpath('foo', ['dsc']))
101
class TestMemoryStore(TestCase):
104
return store.ImmutableMemoryStore()
106
def test_imports(self):
107
from bzrlib.store import ImmutableMemoryStore
109
def test_add_and_retrieve(self):
110
store = self.get_store()
111
store.add(StringIO('hello'), 'aa')
112
self.assertNotEqual(store.get('aa'), None)
113
self.assertEqual(store.get('aa').read(), 'hello')
114
store.add(StringIO('hello world'), 'bb')
115
self.assertNotEqual(store.get('bb'), None)
116
self.assertEqual(store.get('bb').read(), 'hello world')
118
def test_missing_is_absent(self):
119
store = self.get_store()
120
self.failIf('aa' in store)
122
def test_adding_fails_when_present(self):
123
my_store = self.get_store()
124
my_store.add(StringIO('hello'), 'aa')
125
self.assertRaises(BzrError,
126
my_store.add, StringIO('hello'), 'aa')
128
def test_total_size(self):
129
store = self.get_store()
130
store.add(StringIO('goodbye'), '123123')
131
store.add(StringIO('goodbye2'), '123123.dsc')
132
self.assertEqual(store.total_size(), (2, 15))
133
# TODO: Switch the exception form UnlistableStore to
134
# or make Stores throw UnlistableStore if their
135
# Transport doesn't support listing
136
# store_c = RemoteStore('http://example.com/')
137
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
140
class TestTextStore(TestCaseInTempDir, TestStores):
142
def get_store(self, path='.'):
143
t = LocalTransport(path)
146
def test_total_size(self):
147
store = self.get_store()
148
store.add(StringIO('goodbye'), '123123')
149
store.add(StringIO('goodbye2'), '123123.dsc')
150
self.assertEqual(store.total_size(), (2, 15))
151
# TODO: Switch the exception form UnlistableStore to
152
# or make Stores throw UnlistableStore if their
153
# Transport doesn't support listing
154
# store_c = RemoteStore('http://example.com/')
155
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
158
class MockTransport(transport.Transport):
159
"""A fake transport for testing with."""
161
def has(self, filename):
164
def __init__(self, url=None):
166
url = "http://example.com"
167
super(MockTransport, self).__init__(url)
169
def mkdir(self, filename):
173
class InstrumentedTransportStore(store.TransportStore):
174
"""An instrumented TransportStore.
176
Here we replace template method worker methods with calls that record the
180
def _add(self, filename, file):
181
self._calls.append(("_add", filename, file))
183
def __init__(self, transport, prefixed=False):
184
super(InstrumentedTransportStore, self).__init__(transport, prefixed)
188
class TestInstrumentedTransportStore(TestCase):
190
def test__add_records(self):
191
my_store = InstrumentedTransportStore(MockTransport())
192
my_store._add("filename", "file")
193
self.assertEqual([("_add", "filename", "file")], my_store._calls)
196
class TestMockTransport(TestCase):
198
def test_isinstance(self):
199
self.failUnless(isinstance(MockTransport(), transport.Transport))
202
self.assertEqual(False, MockTransport().has('foo'))
204
def test_mkdir(self):
205
MockTransport().mkdir('45')
208
class TestTransportStore(TestCase):
210
def test__relpath_invalid(self):
211
my_store = store.TransportStore(MockTransport())
212
self.assertRaises(ValueError, my_store._relpath, '/foo')
213
self.assertRaises(ValueError, my_store._relpath, 'foo/')
215
def test_register_invalid_suffixes(self):
216
my_store = store.TransportStore(MockTransport())
217
self.assertRaises(ValueError, my_store.register_suffix, '/')
218
self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
220
def test__relpath_unregister_suffixes(self):
221
my_store = store.TransportStore(MockTransport())
222
self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
223
self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
225
def test__relpath_simple(self):
226
my_store = store.TransportStore(MockTransport())
227
self.assertEqual("foo", my_store._relpath('foo'))
229
def test__relpath_prefixed(self):
230
my_store = store.TransportStore(MockTransport(), True)
231
self.assertEqual('45/foo', my_store._relpath('foo'))
233
def test__relpath_simple_suffixed(self):
234
my_store = store.TransportStore(MockTransport())
235
my_store.register_suffix('gz')
236
my_store.register_suffix('bar')
237
self.assertEqual('foo.gz', my_store._relpath('foo', ['gz']))
238
self.assertEqual('foo.gz.bar', my_store._relpath('foo', ['gz', 'bar']))
240
def test__relpath_prefixed_suffixed(self):
241
my_store = store.TransportStore(MockTransport(), True)
242
my_store.register_suffix('gz')
243
my_store.register_suffix('bar')
244
self.assertEqual('45/foo.gz', my_store._relpath('foo', ['gz']))
245
self.assertEqual('45/foo.gz.bar',
246
my_store._relpath('foo', ['gz', 'bar']))
248
def test_add_simple(self):
249
stream = StringIO("content")
250
my_store = InstrumentedTransportStore(MockTransport())
251
my_store.add(stream, "foo")
252
self.assertEqual([("_add", "foo", stream)], my_store._calls)
254
def test_add_prefixed(self):
255
stream = StringIO("content")
256
my_store = InstrumentedTransportStore(MockTransport(), True)
257
my_store.add(stream, "foo")
258
self.assertEqual([("_add", "45/foo", stream)], my_store._calls)
260
def test_add_simple_suffixed(self):
261
stream = StringIO("content")
262
my_store = InstrumentedTransportStore(MockTransport())
263
my_store.register_suffix('dsc')
264
my_store.add(stream, "foo", 'dsc')
265
self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
267
def test_add_simple_suffixed(self):
268
stream = StringIO("content")
269
my_store = InstrumentedTransportStore(MockTransport(), True)
270
my_store.register_suffix('dsc')
271
my_store.add(stream, "foo", 'dsc')
272
self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
274
def get_populated_store(self, prefixed=False, store_class=TextStore):
275
my_store = store_class(MemoryTransport(), prefixed)
276
my_store.register_suffix('sig')
277
stream = StringIO("signature")
278
my_store.add(stream, "foo", 'sig')
279
stream = StringIO("content")
280
my_store.add(stream, "foo")
281
stream = StringIO("signature for missing base")
282
my_store.add(stream, "missing", 'sig')
285
def test_has_simple(self):
286
my_store = self.get_populated_store()
287
self.assertEqual(True, my_store.has_id('foo'))
288
my_store = self.get_populated_store(True)
289
self.assertEqual(True, my_store.has_id('foo'))
291
def test_has_suffixed(self):
292
my_store = self.get_populated_store()
293
self.assertEqual(True, my_store.has_id('foo', 'sig'))
294
my_store = self.get_populated_store(True)
295
self.assertEqual(True, my_store.has_id('foo', 'sig'))
297
def test_has_suffixed_no_base(self):
298
my_store = self.get_populated_store()
299
self.assertEqual(False, my_store.has_id('missing'))
300
my_store = self.get_populated_store(True)
301
self.assertEqual(False, my_store.has_id('missing'))
303
def test_get_simple(self):
304
my_store = self.get_populated_store()
305
self.assertEqual('content', my_store.get('foo').read())
306
my_store = self.get_populated_store(True)
307
self.assertEqual('content', my_store.get('foo').read())
309
def test_get_suffixed(self):
310
my_store = self.get_populated_store()
311
self.assertEqual('signature', my_store.get('foo', 'sig').read())
312
my_store = self.get_populated_store(True)
313
self.assertEqual('signature', my_store.get('foo', 'sig').read())
315
def test_get_suffixed_no_base(self):
316
my_store = self.get_populated_store()
317
self.assertEqual('signature for missing base',
318
my_store.get('missing', 'sig').read())
319
my_store = self.get_populated_store(True)
320
self.assertEqual('signature for missing base',
321
my_store.get('missing', 'sig').read())
323
def test___iter__no_suffix(self):
324
my_store = TextStore(MemoryTransport(), False)
325
stream = StringIO("content")
326
my_store.add(stream, "foo")
327
self.assertEqual(set(['foo']),
328
set(my_store.__iter__()))
330
def test___iter__(self):
331
self.assertEqual(set(['foo']),
332
set(self.get_populated_store().__iter__()))
333
self.assertEqual(set(['foo']),
334
set(self.get_populated_store(True).__iter__()))
336
def test___iter__compressed(self):
337
self.assertEqual(set(['foo']),
338
set(self.get_populated_store(
339
store_class=CompressedTextStore).__iter__()))
340
self.assertEqual(set(['foo']),
341
set(self.get_populated_store(
342
True, CompressedTextStore).__iter__()))
344
def test___len__(self):
345
self.assertEqual(1, len(self.get_populated_store()))
347
def test_copy_suffixes(self):
348
from_store = self.get_populated_store()
349
to_store = CompressedTextStore(MemoryTransport(), True)
350
to_store.register_suffix('sig')
351
copy_all(from_store, to_store)
352
self.assertEqual(1, len(to_store))
353
self.assertEqual(set(['foo']), set(to_store.__iter__()))
354
self.assertEqual('content', to_store.get('foo').read())
355
self.assertEqual('signature', to_store.get('foo', 'sig').read())
356
self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
358
def test_relpath_escaped(self):
359
my_store = store.TransportStore(MemoryTransport())
360
self.assertEqual('%25', my_store._relpath('%'))