1
# Copyright (C) 2005 by Canonical Development Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Test Store implementations."""
19
from cStringIO import StringIO
23
from bzrlib.errors import BzrError, UnlistableStore
24
from bzrlib.store import copy_all
25
from bzrlib.transport.local import LocalTransport
26
from bzrlib.transport import NoSuchFile
27
from bzrlib.store.text import TextStore
28
from bzrlib.selftest import TestCase, TestCaseInTempDir
29
import bzrlib.store as store
30
import bzrlib.transport as transport
31
from bzrlib.transport.memory import MemoryTransport
34
class TestStores(object):
35
"""Mixin template class that provides some common tests for stores"""
37
def check_content(self, store, fileid, value):
39
self.assertEqual(f.read(), value)
41
def fill_store(self, store):
42
store.add(StringIO('hello'), 'a')
43
store.add(StringIO('other'), 'b')
44
store.add(StringIO('something'), 'c')
45
store.add(StringIO('goodbye'), '123123')
47
def test_copy_all(self):
50
store_a = self.get_store('a')
51
store_a.add('foo', '1')
53
store_b = self.get_store('b')
54
copy_all(store_a, store_b)
55
self.assertEqual(store_a.get('1').read(), 'foo')
56
self.assertEqual(store_b.get('1').read(), 'foo')
57
# TODO: Switch the exception form UnlistableStore to
58
# or make Stores throw UnlistableStore if their
59
# Transport doesn't support listing
60
# store_c = RemoteStore('http://example.com/')
61
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
64
store = self.get_store()
65
self.fill_store(store)
67
self.check_content(store, 'a', 'hello')
68
self.check_content(store, 'b', 'other')
69
self.check_content(store, 'c', 'something')
71
# Make sure that requesting a non-existing file fails
72
self.assertRaises(KeyError, self.check_content, store, 'd', None)
74
def test_multiple_add(self):
75
"""Multiple add with same ID should raise a BzrError"""
76
store = self.get_store()
77
self.fill_store(store)
78
self.assertRaises(BzrError, store.add, StringIO('goodbye'), '123123')
81
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
83
def get_store(self, path='.'):
84
t = LocalTransport(path)
85
return TextStore(t, compressed=True)
87
def test_total_size(self):
88
store = self.get_store('.')
89
store.register_suffix('dsc')
90
store.add(StringIO('goodbye'), '123123')
91
store.add(StringIO('goodbye2'), '123123', 'dsc')
92
# these get gzipped - content should be stable
93
self.assertEqual(store.total_size(), (2, 55))
95
def test__relpath_suffixed(self):
96
my_store = TextStore(MockTransport(), True, compressed=True)
97
my_store.register_suffix('dsc')
98
self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
101
class TestMemoryStore(TestCase):
104
return store.ImmutableMemoryStore()
106
def test_imports(self):
107
from bzrlib.store import ImmutableMemoryStore
109
def test_add_and_retrieve(self):
110
store = self.get_store()
111
store.add(StringIO('hello'), 'aa')
112
self.assertNotEqual(store.get('aa'), None)
113
self.assertEqual(store.get('aa').read(), 'hello')
114
store.add(StringIO('hello world'), 'bb')
115
self.assertNotEqual(store.get('bb'), None)
116
self.assertEqual(store.get('bb').read(), 'hello world')
118
def test_missing_is_absent(self):
119
store = self.get_store()
120
self.failIf('aa' in store)
122
def test_adding_fails_when_present(self):
123
my_store = self.get_store()
124
my_store.add(StringIO('hello'), 'aa')
125
self.assertRaises(BzrError,
126
my_store.add, StringIO('hello'), 'aa')
128
def test_total_size(self):
129
store = self.get_store()
130
store.add(StringIO('goodbye'), '123123')
131
store.add(StringIO('goodbye2'), '123123.dsc')
132
self.assertEqual(store.total_size(), (2, 15))
133
# TODO: Switch the exception form UnlistableStore to
134
# or make Stores throw UnlistableStore if their
135
# Transport doesn't support listing
136
# store_c = RemoteStore('http://example.com/')
137
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
140
class TestTextStore(TestCaseInTempDir, TestStores):
142
def get_store(self, path='.'):
143
t = LocalTransport(path)
144
return TextStore(t, compressed=False)
146
def test_total_size(self):
147
store = self.get_store()
148
store.add(StringIO('goodbye'), '123123')
149
store.add(StringIO('goodbye2'), '123123.dsc')
150
self.assertEqual(store.total_size(), (2, 15))
151
# TODO: Switch the exception form UnlistableStore to
152
# or make Stores throw UnlistableStore if their
153
# Transport doesn't support listing
154
# store_c = RemoteStore('http://example.com/')
155
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
158
class TestMixedTextStore(TestCaseInTempDir, TestStores):
160
def get_store(self, path='.', compressed=True):
161
t = LocalTransport(path)
162
return TextStore(t, compressed=compressed)
164
def test_get_mixed(self):
165
cs = self.get_store('.', compressed=True)
166
s = self.get_store('.', compressed=False)
167
cs.add(StringIO('hello there'), 'a')
169
self.failUnlessExists('a.gz')
170
self.failIf(os.path.lexists('a'))
172
self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
174
self.assertEquals(cs.has_id('a'), True)
175
self.assertEquals(s.has_id('a'), True)
176
self.assertEquals(cs.get('a').read(), 'hello there')
177
self.assertEquals(s.get('a').read(), 'hello there')
179
self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
181
s.add(StringIO('goodbye'), 'b')
182
self.failUnlessExists('b')
183
self.failIf(os.path.lexists('b.gz'))
184
self.assertEquals(open('b').read(), 'goodbye')
186
self.assertEquals(cs.has_id('b'), True)
187
self.assertEquals(s.has_id('b'), True)
188
self.assertEquals(cs.get('b').read(), 'goodbye')
189
self.assertEquals(s.get('b').read(), 'goodbye')
191
self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
193
class MockTransport(transport.Transport):
194
"""A fake transport for testing with."""
196
def has(self, filename):
199
def __init__(self, url=None):
201
url = "http://example.com"
202
super(MockTransport, self).__init__(url)
204
def mkdir(self, filename):
208
class InstrumentedTransportStore(store.TransportStore):
209
"""An instrumented TransportStore.
211
Here we replace template method worker methods with calls that record the
215
def _add(self, filename, file):
216
self._calls.append(("_add", filename, file))
218
def __init__(self, transport, prefixed=False):
219
super(InstrumentedTransportStore, self).__init__(transport, prefixed)
223
class TestInstrumentedTransportStore(TestCase):
225
def test__add_records(self):
226
my_store = InstrumentedTransportStore(MockTransport())
227
my_store._add("filename", "file")
228
self.assertEqual([("_add", "filename", "file")], my_store._calls)
231
class TestMockTransport(TestCase):
233
def test_isinstance(self):
234
self.failUnless(isinstance(MockTransport(), transport.Transport))
237
self.assertEqual(False, MockTransport().has('foo'))
239
def test_mkdir(self):
240
MockTransport().mkdir('45')
243
class TestTransportStore(TestCase):
245
def test__relpath_invalid(self):
246
my_store = store.TransportStore(MockTransport())
247
self.assertRaises(ValueError, my_store._relpath, '/foo')
248
self.assertRaises(ValueError, my_store._relpath, 'foo/')
250
def test_register_invalid_suffixes(self):
251
my_store = store.TransportStore(MockTransport())
252
self.assertRaises(ValueError, my_store.register_suffix, '/')
253
self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
255
def test__relpath_unregister_suffixes(self):
256
my_store = store.TransportStore(MockTransport())
257
self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
258
self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
260
def test__relpath_simple(self):
261
my_store = store.TransportStore(MockTransport())
262
self.assertEqual("foo", my_store._relpath('foo'))
264
def test__relpath_prefixed(self):
265
my_store = store.TransportStore(MockTransport(), True)
266
self.assertEqual('45/foo', my_store._relpath('foo'))
268
def test__relpath_simple_suffixed(self):
269
my_store = store.TransportStore(MockTransport())
270
my_store.register_suffix('bar')
271
my_store.register_suffix('baz')
272
self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
273
self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
275
def test__relpath_prefixed_suffixed(self):
276
my_store = store.TransportStore(MockTransport(), True)
277
my_store.register_suffix('bar')
278
my_store.register_suffix('baz')
279
self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
280
self.assertEqual('45/foo.bar.baz',
281
my_store._relpath('foo', ['bar', 'baz']))
283
def test_add_simple(self):
284
stream = StringIO("content")
285
my_store = InstrumentedTransportStore(MockTransport())
286
my_store.add(stream, "foo")
287
self.assertEqual([("_add", "foo", stream)], my_store._calls)
289
def test_add_prefixed(self):
290
stream = StringIO("content")
291
my_store = InstrumentedTransportStore(MockTransport(), True)
292
my_store.add(stream, "foo")
293
self.assertEqual([("_add", "45/foo", stream)], my_store._calls)
295
def test_add_simple_suffixed(self):
296
stream = StringIO("content")
297
my_store = InstrumentedTransportStore(MockTransport())
298
my_store.register_suffix('dsc')
299
my_store.add(stream, "foo", 'dsc')
300
self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
302
def test_add_simple_suffixed(self):
303
stream = StringIO("content")
304
my_store = InstrumentedTransportStore(MockTransport(), True)
305
my_store.register_suffix('dsc')
306
my_store.add(stream, "foo", 'dsc')
307
self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
309
def get_populated_store(self, prefixed=False,
310
store_class=TextStore, compressed=False):
311
my_store = store_class(MemoryTransport(), prefixed,
312
compressed=compressed)
313
my_store.register_suffix('sig')
314
stream = StringIO("signature")
315
my_store.add(stream, "foo", 'sig')
316
stream = StringIO("content")
317
my_store.add(stream, "foo")
318
stream = StringIO("signature for missing base")
319
my_store.add(stream, "missing", 'sig')
322
def test_has_simple(self):
323
my_store = self.get_populated_store()
324
self.assertEqual(True, my_store.has_id('foo'))
325
my_store = self.get_populated_store(True)
326
self.assertEqual(True, my_store.has_id('foo'))
328
def test_has_suffixed(self):
329
my_store = self.get_populated_store()
330
self.assertEqual(True, my_store.has_id('foo', 'sig'))
331
my_store = self.get_populated_store(True)
332
self.assertEqual(True, my_store.has_id('foo', 'sig'))
334
def test_has_suffixed_no_base(self):
335
my_store = self.get_populated_store()
336
self.assertEqual(False, my_store.has_id('missing'))
337
my_store = self.get_populated_store(True)
338
self.assertEqual(False, my_store.has_id('missing'))
340
def test_get_simple(self):
341
my_store = self.get_populated_store()
342
self.assertEqual('content', my_store.get('foo').read())
343
my_store = self.get_populated_store(True)
344
self.assertEqual('content', my_store.get('foo').read())
346
def test_get_suffixed(self):
347
my_store = self.get_populated_store()
348
self.assertEqual('signature', my_store.get('foo', 'sig').read())
349
my_store = self.get_populated_store(True)
350
self.assertEqual('signature', my_store.get('foo', 'sig').read())
352
def test_get_suffixed_no_base(self):
353
my_store = self.get_populated_store()
354
self.assertEqual('signature for missing base',
355
my_store.get('missing', 'sig').read())
356
my_store = self.get_populated_store(True)
357
self.assertEqual('signature for missing base',
358
my_store.get('missing', 'sig').read())
360
def test___iter__no_suffix(self):
361
my_store = TextStore(MemoryTransport(), False, compressed=False)
362
stream = StringIO("content")
363
my_store.add(stream, "foo")
364
self.assertEqual(set(['foo']),
365
set(my_store.__iter__()))
367
def test___iter__(self):
368
self.assertEqual(set(['foo']),
369
set(self.get_populated_store().__iter__()))
370
self.assertEqual(set(['foo']),
371
set(self.get_populated_store(True).__iter__()))
373
def test___iter__compressed(self):
374
self.assertEqual(set(['foo']),
375
set(self.get_populated_store(
376
compressed=True).__iter__()))
377
self.assertEqual(set(['foo']),
378
set(self.get_populated_store(
379
True, compressed=True).__iter__()))
381
def test___len__(self):
382
self.assertEqual(1, len(self.get_populated_store()))
384
def test_copy_suffixes(self):
385
from_store = self.get_populated_store()
386
to_store = TextStore(MemoryTransport(), True, compressed=True)
387
to_store.register_suffix('sig')
388
copy_all(from_store, to_store)
389
self.assertEqual(1, len(to_store))
390
self.assertEqual(set(['foo']), set(to_store.__iter__()))
391
self.assertEqual('content', to_store.get('foo').read())
392
self.assertEqual('signature', to_store.get('foo', 'sig').read())
393
self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
395
def test_relpath_escaped(self):
396
my_store = store.TransportStore(MemoryTransport())
397
self.assertEqual('%25', my_store._relpath('%'))