1
# Copyright (C) 2005 by Canonical Development Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Test Store implementations."""
19
from cStringIO import StringIO
22
from bzrlib.errors import BzrError, UnlistableStore
23
from bzrlib.store import copy_all
24
from bzrlib.transport.local import LocalTransport
25
from bzrlib.transport import NoSuchFile
26
from bzrlib.store.compressed_text import CompressedTextStore
27
from bzrlib.store.text import TextStore
28
from bzrlib.selftest import TestCase, TestCaseInTempDir
29
import bzrlib.store as store
30
import bzrlib.transport as transport
31
from bzrlib.transport.memory import MemoryTransport
34
class TestStores(object):
36
def check_content(self, store, fileid, value):
38
self.assertEqual(f.read(), value)
40
def fill_store(self, store):
41
store.add(StringIO('hello'), 'a')
42
store.add(StringIO('other'), 'b')
43
store.add(StringIO('something'), 'c')
44
store.add(StringIO('goodbye'), '123123')
46
def test_copy_all(self):
49
store_a = self.get_store('a')
50
store_a.add('foo', '1')
52
store_b = self.get_store('b')
53
copy_all(store_a, store_b)
54
self.assertEqual(store_a.get('1').read(), 'foo')
55
self.assertEqual(store_b.get('1').read(), 'foo')
56
# TODO: Switch the exception form UnlistableStore to
57
# or make Stores throw UnlistableStore if their
58
# Transport doesn't support listing
59
# store_c = RemoteStore('http://example.com/')
60
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
63
store = self.get_store()
64
self.fill_store(store)
66
self.check_content(store, 'a', 'hello')
67
self.check_content(store, 'b', 'other')
68
self.check_content(store, 'c', 'something')
70
# Make sure that requesting a non-existing file fails
71
self.assertRaises(KeyError, self.check_content, store, 'd', None)
73
def test_multiple_add(self):
74
"""Multiple add with same ID should raise a BzrError"""
75
store = self.get_store()
76
self.fill_store(store)
77
self.assertRaises(BzrError, store.add, StringIO('goodbye'), '123123')
80
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
82
def get_store(self, path='.'):
83
t = LocalTransport(path)
84
return CompressedTextStore(t)
86
def test_total_size(self):
87
store = self.get_store('.')
88
store.register_suffix('dsc')
89
store.add(StringIO('goodbye'), '123123')
90
store.add(StringIO('goodbye2'), '123123', 'dsc')
91
# these get gzipped - content should be stable
92
self.assertEqual(store.total_size(), (2, 55))
94
def test__relpath_suffixed(self):
95
my_store = CompressedTextStore(MockTransport(), True)
96
my_store.register_suffix('dsc')
97
self.assertEqual('45/foo.dsc.gz', my_store._relpath('foo', ['dsc']))
100
class TestMemoryStore(TestCase):
103
return store.ImmutableMemoryStore()
105
def test_imports(self):
106
from bzrlib.store import ImmutableMemoryStore
108
def test_add_and_retrieve(self):
109
store = self.get_store()
110
store.add(StringIO('hello'), 'aa')
111
self.assertNotEqual(store.get('aa'), None)
112
self.assertEqual(store.get('aa').read(), 'hello')
113
store.add(StringIO('hello world'), 'bb')
114
self.assertNotEqual(store.get('bb'), None)
115
self.assertEqual(store.get('bb').read(), 'hello world')
117
def test_missing_is_absent(self):
118
store = self.get_store()
119
self.failIf('aa' in store)
121
def test_adding_fails_when_present(self):
122
my_store = self.get_store()
123
my_store.add(StringIO('hello'), 'aa')
124
self.assertRaises(BzrError,
125
my_store.add, StringIO('hello'), 'aa')
127
def test_total_size(self):
128
store = self.get_store()
129
store.add(StringIO('goodbye'), '123123')
130
store.add(StringIO('goodbye2'), '123123.dsc')
131
self.assertEqual(store.total_size(), (2, 15))
132
# TODO: Switch the exception form UnlistableStore to
133
# or make Stores throw UnlistableStore if their
134
# Transport doesn't support listing
135
# store_c = RemoteStore('http://example.com/')
136
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
139
class TestTextStore(TestCaseInTempDir, TestStores):
141
def get_store(self, path='.'):
142
t = LocalTransport(path)
145
def test_total_size(self):
146
store = self.get_store()
147
store.add(StringIO('goodbye'), '123123')
148
store.add(StringIO('goodbye2'), '123123.dsc')
149
self.assertEqual(store.total_size(), (2, 15))
150
# TODO: Switch the exception form UnlistableStore to
151
# or make Stores throw UnlistableStore if their
152
# Transport doesn't support listing
153
# store_c = RemoteStore('http://example.com/')
154
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
157
class MockTransport(transport.Transport):
158
"""A fake transport for testing with."""
160
def has(self, filename):
163
def __init__(self, url=None):
165
url = "http://example.com"
166
super(MockTransport, self).__init__(url)
168
def mkdir(self, filename):
172
class InstrumentedTransportStore(store.TransportStore):
173
"""An instrumented TransportStore.
175
Here we replace template method worker methods with calls that record the
179
def _add(self, filename, file):
180
self._calls.append(("_add", filename, file))
182
def __init__(self, transport, prefixed=False):
183
super(InstrumentedTransportStore, self).__init__(transport, prefixed)
187
class TestInstrumentedTransportStore(TestCase):
189
def test__add_records(self):
190
my_store = InstrumentedTransportStore(MockTransport())
191
my_store._add("filename", "file")
192
self.assertEqual([("_add", "filename", "file")], my_store._calls)
195
class TestMockTransport(TestCase):
197
def test_isinstance(self):
198
self.failUnless(isinstance(MockTransport(), transport.Transport))
201
self.assertEqual(False, MockTransport().has('foo'))
203
def test_mkdir(self):
204
MockTransport().mkdir('45')
207
class TestTransportStore(TestCase):
209
def test__relpath_invalid(self):
210
my_store = store.TransportStore(MockTransport())
211
self.assertRaises(ValueError, my_store._relpath, '/foo')
212
self.assertRaises(ValueError, my_store._relpath, 'foo/')
214
def test_register_invalid_suffixes(self):
215
my_store = store.TransportStore(MockTransport())
216
self.assertRaises(ValueError, my_store.register_suffix, '/')
217
self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
219
def test__relpath_unregister_suffixes(self):
220
my_store = store.TransportStore(MockTransport())
221
self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
222
self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
224
def test__relpath_simple(self):
225
my_store = store.TransportStore(MockTransport())
226
self.assertEqual("foo", my_store._relpath('foo'))
228
def test__relpath_prefixed(self):
229
my_store = store.TransportStore(MockTransport(), True)
230
self.assertEqual('45/foo', my_store._relpath('foo'))
232
def test__relpath_simple_suffixed(self):
233
my_store = store.TransportStore(MockTransport())
234
my_store.register_suffix('gz')
235
my_store.register_suffix('bar')
236
self.assertEqual('foo.gz', my_store._relpath('foo', ['gz']))
237
self.assertEqual('foo.gz.bar', my_store._relpath('foo', ['gz', 'bar']))
239
def test__relpath_prefixed_suffixed(self):
240
my_store = store.TransportStore(MockTransport(), True)
241
my_store.register_suffix('gz')
242
my_store.register_suffix('bar')
243
self.assertEqual('45/foo.gz', my_store._relpath('foo', ['gz']))
244
self.assertEqual('45/foo.gz.bar',
245
my_store._relpath('foo', ['gz', 'bar']))
247
def test_add_simple(self):
248
stream = StringIO("content")
249
my_store = InstrumentedTransportStore(MockTransport())
250
my_store.add(stream, "foo")
251
self.assertEqual([("_add", "foo", stream)], my_store._calls)
253
def test_add_prefixed(self):
254
stream = StringIO("content")
255
my_store = InstrumentedTransportStore(MockTransport(), True)
256
my_store.add(stream, "foo")
257
self.assertEqual([("_add", "45/foo", stream)], my_store._calls)
259
def test_add_simple_suffixed(self):
260
stream = StringIO("content")
261
my_store = InstrumentedTransportStore(MockTransport())
262
my_store.register_suffix('dsc')
263
my_store.add(stream, "foo", 'dsc')
264
self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
266
def test_add_simple_suffixed(self):
267
stream = StringIO("content")
268
my_store = InstrumentedTransportStore(MockTransport(), True)
269
my_store.register_suffix('dsc')
270
my_store.add(stream, "foo", 'dsc')
271
self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
273
def get_populated_store(self, prefixed=False, store_class=TextStore):
274
my_store = store_class(MemoryTransport(), prefixed)
275
my_store.register_suffix('sig')
276
stream = StringIO("signature")
277
my_store.add(stream, "foo", 'sig')
278
stream = StringIO("content")
279
my_store.add(stream, "foo")
280
stream = StringIO("signature for missing base")
281
my_store.add(stream, "missing", 'sig')
284
def test_has_simple(self):
285
my_store = self.get_populated_store()
286
self.assertEqual(True, my_store.has_id('foo'))
287
my_store = self.get_populated_store(True)
288
self.assertEqual(True, my_store.has_id('foo'))
290
def test_has_suffixed(self):
291
my_store = self.get_populated_store()
292
self.assertEqual(True, my_store.has_id('foo', 'sig'))
293
my_store = self.get_populated_store(True)
294
self.assertEqual(True, my_store.has_id('foo', 'sig'))
296
def test_has_suffixed_no_base(self):
297
my_store = self.get_populated_store()
298
self.assertEqual(False, my_store.has_id('missing'))
299
my_store = self.get_populated_store(True)
300
self.assertEqual(False, my_store.has_id('missing'))
302
def test_get_simple(self):
303
my_store = self.get_populated_store()
304
self.assertEqual('content', my_store.get('foo').read())
305
my_store = self.get_populated_store(True)
306
self.assertEqual('content', my_store.get('foo').read())
308
def test_get_suffixed(self):
309
my_store = self.get_populated_store()
310
self.assertEqual('signature', my_store.get('foo', 'sig').read())
311
my_store = self.get_populated_store(True)
312
self.assertEqual('signature', my_store.get('foo', 'sig').read())
314
def test_get_suffixed_no_base(self):
315
my_store = self.get_populated_store()
316
self.assertEqual('signature for missing base',
317
my_store.get('missing', 'sig').read())
318
my_store = self.get_populated_store(True)
319
self.assertEqual('signature for missing base',
320
my_store.get('missing', 'sig').read())
322
def test___iter__no_suffix(self):
323
my_store = TextStore(MemoryTransport(), False)
324
stream = StringIO("content")
325
my_store.add(stream, "foo")
326
self.assertEqual(set(['foo']),
327
set(my_store.__iter__()))
329
def test___iter__(self):
330
self.assertEqual(set(['foo']),
331
set(self.get_populated_store().__iter__()))
332
self.assertEqual(set(['foo']),
333
set(self.get_populated_store(True).__iter__()))
335
def test___iter__compressed(self):
336
self.assertEqual(set(['foo']),
337
set(self.get_populated_store(
338
store_class=CompressedTextStore).__iter__()))
339
self.assertEqual(set(['foo']),
340
set(self.get_populated_store(
341
True, CompressedTextStore).__iter__()))
343
def test___len__(self):
344
self.assertEqual(1, len(self.get_populated_store()))
346
def test_copy_suffixes(self):
347
from_store = self.get_populated_store()
348
to_store = CompressedTextStore(MemoryTransport(), True)
349
to_store.register_suffix('sig')
350
copy_all(from_store, to_store)
351
self.assertEqual(1, len(to_store))
352
self.assertEqual(set(['foo']), set(to_store.__iter__()))
353
self.assertEqual('content', to_store.get('foo').read())
354
self.assertEqual('signature', to_store.get('foo', 'sig').read())
355
self.assertRaises(KeyError, to_store.get, 'missing', 'sig')