1
# Copyright (C) 2005, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Test Store implementations."""
19
from cStringIO import StringIO
23
import bzrlib.errors as errors
24
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
25
from bzrlib.transport.local import LocalTransport
26
from bzrlib.store.text import TextStore
27
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
28
import bzrlib.store as store
29
import bzrlib.transactions as transactions
30
import bzrlib.transport as transport
31
from bzrlib.transport.memory import MemoryTransport
34
class TestStores(object):
35
"""Mixin template class that provides some common tests for stores"""
37
def check_content(self, store, fileid, value):
39
self.assertEqual(f.read(), value)
41
def fill_store(self, store):
42
store.add(StringIO('hello'), 'a')
43
store.add(StringIO('other'), 'b')
44
store.add(StringIO('something'), 'c')
45
store.add(StringIO('goodbye'), '123123')
47
def test_copy_all(self):
50
store_a = self.get_store('a')
51
store_a.add(StringIO('foo'), '1')
53
store_b = self.get_store('b')
54
store_b.copy_all_ids(store_a)
55
self.assertEqual(store_a.get('1').read(), 'foo')
56
self.assertEqual(store_b.get('1').read(), 'foo')
57
# TODO: Switch the exception form UnlistableStore to
58
# or make Stores throw UnlistableStore if their
59
# Transport doesn't support listing
60
# store_c = RemoteStore('http://example.com/')
61
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
64
store = self.get_store()
65
self.fill_store(store)
67
self.check_content(store, 'a', 'hello')
68
self.check_content(store, 'b', 'other')
69
self.check_content(store, 'c', 'something')
71
# Make sure that requesting a non-existing file fails
72
self.assertRaises(KeyError, self.check_content, store, 'd', None)
74
def test_multiple_add(self):
75
"""Multiple add with same ID should raise a BzrError"""
76
store = self.get_store()
77
self.fill_store(store)
78
self.assertRaises(BzrError, store.add, StringIO('goodbye'), '123123')
81
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
83
def get_store(self, path=u'.'):
84
t = transport.get_transport(path)
85
return TextStore(t, compressed=True)
87
def test_total_size(self):
88
store = self.get_store(u'.')
89
store.register_suffix('dsc')
90
store.add(StringIO('goodbye'), '123123')
91
store.add(StringIO('goodbye2'), '123123', 'dsc')
92
# these get gzipped - content should be stable
93
self.assertEqual(store.total_size(), (2, 55))
95
def test__relpath_suffixed(self):
96
my_store = TextStore(MockTransport(),
97
prefixed=True, compressed=True)
98
my_store.register_suffix('dsc')
99
self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
102
class TestMemoryStore(TestCase):
105
return TextStore(MemoryTransport())
107
def test_add_and_retrieve(self):
108
store = self.get_store()
109
store.add(StringIO('hello'), 'aa')
110
self.assertNotEqual(store.get('aa'), None)
111
self.assertEqual(store.get('aa').read(), 'hello')
112
store.add(StringIO('hello world'), 'bb')
113
self.assertNotEqual(store.get('bb'), None)
114
self.assertEqual(store.get('bb').read(), 'hello world')
116
def test_missing_is_absent(self):
117
store = self.get_store()
118
self.failIf('aa' in store)
120
def test_adding_fails_when_present(self):
121
my_store = self.get_store()
122
my_store.add(StringIO('hello'), 'aa')
123
self.assertRaises(BzrError,
124
my_store.add, StringIO('hello'), 'aa')
126
def test_total_size(self):
127
store = self.get_store()
128
store.add(StringIO('goodbye'), '123123')
129
store.add(StringIO('goodbye2'), '123123.dsc')
130
self.assertEqual(store.total_size(), (2, 15))
131
# TODO: Switch the exception form UnlistableStore to
132
# or make Stores throw UnlistableStore if their
133
# Transport doesn't support listing
134
# store_c = RemoteStore('http://example.com/')
135
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
138
class TestTextStore(TestCaseInTempDir, TestStores):
140
def get_store(self, path=u'.'):
141
t = transport.get_transport(path)
142
return TextStore(t, compressed=False)
144
def test_total_size(self):
145
store = self.get_store()
146
store.add(StringIO('goodbye'), '123123')
147
store.add(StringIO('goodbye2'), '123123.dsc')
148
self.assertEqual(store.total_size(), (2, 15))
149
# TODO: Switch the exception form UnlistableStore to
150
# or make Stores throw UnlistableStore if their
151
# Transport doesn't support listing
152
# store_c = RemoteStore('http://example.com/')
153
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
156
class TestMixedTextStore(TestCaseInTempDir, TestStores):
158
def get_store(self, path=u'.', compressed=True):
159
t = transport.get_transport(path)
160
return TextStore(t, compressed=compressed)
162
def test_get_mixed(self):
163
cs = self.get_store(u'.', compressed=True)
164
s = self.get_store(u'.', compressed=False)
165
cs.add(StringIO('hello there'), 'a')
167
self.failUnlessExists('a.gz')
168
self.failIf(os.path.lexists('a'))
170
self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
172
self.assertEquals(cs.has_id('a'), True)
173
self.assertEquals(s.has_id('a'), True)
174
self.assertEquals(cs.get('a').read(), 'hello there')
175
self.assertEquals(s.get('a').read(), 'hello there')
177
self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
179
s.add(StringIO('goodbye'), 'b')
180
self.failUnlessExists('b')
181
self.failIf(os.path.lexists('b.gz'))
182
self.assertEquals(open('b').read(), 'goodbye')
184
self.assertEquals(cs.has_id('b'), True)
185
self.assertEquals(s.has_id('b'), True)
186
self.assertEquals(cs.get('b').read(), 'goodbye')
187
self.assertEquals(s.get('b').read(), 'goodbye')
189
self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
191
class MockTransport(transport.Transport):
192
"""A fake transport for testing with."""
194
def has(self, filename):
197
def __init__(self, url=None):
199
url = "http://example.com"
200
super(MockTransport, self).__init__(url)
202
def mkdir(self, filename):
206
class InstrumentedTransportStore(store.TransportStore):
207
"""An instrumented TransportStore.
209
Here we replace template method worker methods with calls that record the
213
def _add(self, filename, file):
214
self._calls.append(("_add", filename, file))
216
def __init__(self, transport, prefixed=False):
217
super(InstrumentedTransportStore, self).__init__(transport, prefixed)
221
class TestInstrumentedTransportStore(TestCase):
223
def test__add_records(self):
224
my_store = InstrumentedTransportStore(MockTransport())
225
my_store._add("filename", "file")
226
self.assertEqual([("_add", "filename", "file")], my_store._calls)
229
class TestMockTransport(TestCase):
231
def test_isinstance(self):
232
self.failUnless(isinstance(MockTransport(), transport.Transport))
235
self.assertEqual(False, MockTransport().has('foo'))
237
def test_mkdir(self):
238
MockTransport().mkdir('45')
241
class TestTransportStore(TestCase):
243
def test__relpath_invalid(self):
244
my_store = store.TransportStore(MockTransport())
245
self.assertRaises(ValueError, my_store._relpath, '/foo')
246
self.assertRaises(ValueError, my_store._relpath, 'foo/')
248
def test_register_invalid_suffixes(self):
249
my_store = store.TransportStore(MockTransport())
250
self.assertRaises(ValueError, my_store.register_suffix, '/')
251
self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
253
def test__relpath_unregister_suffixes(self):
254
my_store = store.TransportStore(MockTransport())
255
self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
256
self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
258
def test__relpath_simple(self):
259
my_store = store.TransportStore(MockTransport())
260
self.assertEqual("foo", my_store._relpath('foo'))
262
def test__relpath_prefixed(self):
263
my_store = store.TransportStore(MockTransport(), True)
264
self.assertEqual('45/foo', my_store._relpath('foo'))
266
def test__relpath_simple_suffixed(self):
267
my_store = store.TransportStore(MockTransport())
268
my_store.register_suffix('bar')
269
my_store.register_suffix('baz')
270
self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
271
self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
273
def test__relpath_prefixed_suffixed(self):
274
my_store = store.TransportStore(MockTransport(), True)
275
my_store.register_suffix('bar')
276
my_store.register_suffix('baz')
277
self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
278
self.assertEqual('45/foo.bar.baz',
279
my_store._relpath('foo', ['bar', 'baz']))
281
def test_add_simple(self):
282
stream = StringIO("content")
283
my_store = InstrumentedTransportStore(MockTransport())
284
my_store.add(stream, "foo")
285
self.assertEqual([("_add", "foo", stream)], my_store._calls)
287
def test_add_prefixed(self):
288
stream = StringIO("content")
289
my_store = InstrumentedTransportStore(MockTransport(), True)
290
my_store.add(stream, "foo")
291
self.assertEqual([("_add", "45/foo", stream)], my_store._calls)
293
def test_add_simple_suffixed(self):
294
stream = StringIO("content")
295
my_store = InstrumentedTransportStore(MockTransport())
296
my_store.register_suffix('dsc')
297
my_store.add(stream, "foo", 'dsc')
298
self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
300
def test_add_simple_suffixed(self):
301
stream = StringIO("content")
302
my_store = InstrumentedTransportStore(MockTransport(), True)
303
my_store.register_suffix('dsc')
304
my_store.add(stream, "foo", 'dsc')
305
self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
307
def get_populated_store(self, prefixed=False,
308
store_class=TextStore, compressed=False):
309
my_store = store_class(MemoryTransport(), prefixed,
310
compressed=compressed)
311
my_store.register_suffix('sig')
312
stream = StringIO("signature")
313
my_store.add(stream, "foo", 'sig')
314
stream = StringIO("content")
315
my_store.add(stream, "foo")
316
stream = StringIO("signature for missing base")
317
my_store.add(stream, "missing", 'sig')
320
def test_has_simple(self):
321
my_store = self.get_populated_store()
322
self.assertEqual(True, my_store.has_id('foo'))
323
my_store = self.get_populated_store(True)
324
self.assertEqual(True, my_store.has_id('foo'))
326
def test_has_suffixed(self):
327
my_store = self.get_populated_store()
328
self.assertEqual(True, my_store.has_id('foo', 'sig'))
329
my_store = self.get_populated_store(True)
330
self.assertEqual(True, my_store.has_id('foo', 'sig'))
332
def test_has_suffixed_no_base(self):
333
my_store = self.get_populated_store()
334
self.assertEqual(False, my_store.has_id('missing'))
335
my_store = self.get_populated_store(True)
336
self.assertEqual(False, my_store.has_id('missing'))
338
def test_get_simple(self):
339
my_store = self.get_populated_store()
340
self.assertEqual('content', my_store.get('foo').read())
341
my_store = self.get_populated_store(True)
342
self.assertEqual('content', my_store.get('foo').read())
344
def test_get_suffixed(self):
345
my_store = self.get_populated_store()
346
self.assertEqual('signature', my_store.get('foo', 'sig').read())
347
my_store = self.get_populated_store(True)
348
self.assertEqual('signature', my_store.get('foo', 'sig').read())
350
def test_get_suffixed_no_base(self):
351
my_store = self.get_populated_store()
352
self.assertEqual('signature for missing base',
353
my_store.get('missing', 'sig').read())
354
my_store = self.get_populated_store(True)
355
self.assertEqual('signature for missing base',
356
my_store.get('missing', 'sig').read())
358
def test___iter__no_suffix(self):
359
my_store = TextStore(MemoryTransport(),
360
prefixed=False, compressed=False)
361
stream = StringIO("content")
362
my_store.add(stream, "foo")
363
self.assertEqual(set(['foo']),
364
set(my_store.__iter__()))
366
def test___iter__(self):
367
self.assertEqual(set(['foo']),
368
set(self.get_populated_store().__iter__()))
369
self.assertEqual(set(['foo']),
370
set(self.get_populated_store(True).__iter__()))
372
def test___iter__compressed(self):
373
self.assertEqual(set(['foo']),
374
set(self.get_populated_store(
375
compressed=True).__iter__()))
376
self.assertEqual(set(['foo']),
377
set(self.get_populated_store(
378
True, compressed=True).__iter__()))
380
def test___len__(self):
381
self.assertEqual(1, len(self.get_populated_store()))
383
def test_copy_suffixes(self):
384
from_store = self.get_populated_store()
385
to_store = TextStore(MemoryTransport(),
386
prefixed=True, compressed=True)
387
to_store.register_suffix('sig')
388
to_store.copy_all_ids(from_store)
389
self.assertEqual(1, len(to_store))
390
self.assertEqual(set(['foo']), set(to_store.__iter__()))
391
self.assertEqual('content', to_store.get('foo').read())
392
self.assertEqual('signature', to_store.get('foo', 'sig').read())
393
self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
395
def test_relpath_escaped(self):
396
my_store = store.TransportStore(MemoryTransport())
397
self.assertEqual('%25', my_store._relpath('%'))
399
def test_escaped_uppercase(self):
400
"""Uppercase letters are escaped for safety on Windows"""
401
my_store = store.TransportStore(MemoryTransport(), escaped=True)
402
# a particularly perverse file-id! :-)
403
self.assertEquals(my_store._escape_file_id('C:<>'), '%43%3a%3c%3e')
406
class TestVersionFileStore(TestCaseWithTransport):
409
return self._transaction
412
super(TestVersionFileStore, self).setUp()
413
self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
414
self.vfstore.get_scope = self.get_scope
415
self._transaction = None
417
def test_get_weave_registers_dirty_in_write(self):
418
self._transaction = transactions.WriteTransaction()
419
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
420
self._transaction.finish()
421
self._transaction = None
422
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
423
self._transaction = transactions.WriteTransaction()
424
vf = self.vfstore.get_weave('id', self._transaction)
425
self._transaction.finish()
426
self._transaction = None
427
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
429
def test_get_weave_readonly_cant_write(self):
430
self._transaction = transactions.WriteTransaction()
431
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
432
self._transaction.finish()
433
self._transaction = transactions.ReadOnlyTransaction()
434
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
435
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])