1
# Copyright (C) 2005 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Test Store implementations."""
19
from cStringIO import StringIO
23
import bzrlib.errors as errors
24
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
25
from bzrlib.transport.local import LocalTransport
26
from bzrlib.store.text import TextStore
27
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
28
import bzrlib.store as store
29
import bzrlib.transactions as transactions
30
import bzrlib.transport as transport
31
from bzrlib.transport.memory import MemoryTransport
34
class TestStores(object):
35
"""Mixin template class that provides some common tests for stores"""
37
def check_content(self, store, fileid, value):
39
self.assertEqual(f.read(), value)
41
def test_add_str_deprecated(self):
43
store = self.get_store('a')
44
self.callDeprecated(['Passing a string to Store.add'
45
' was deprecated in version 0.11.'],
46
store.add, 'foo', '1')
47
self.assertEqual('foo', store.get('1').read())
49
def fill_store(self, store):
50
store.add(StringIO('hello'), 'a')
51
store.add(StringIO('other'), 'b')
52
store.add(StringIO('something'), 'c')
53
store.add(StringIO('goodbye'), '123123')
55
def test_copy_all(self):
58
store_a = self.get_store('a')
59
store_a.add(StringIO('foo'), '1')
61
store_b = self.get_store('b')
62
store_b.copy_all_ids(store_a)
63
self.assertEqual(store_a.get('1').read(), 'foo')
64
self.assertEqual(store_b.get('1').read(), 'foo')
65
# TODO: Switch the exception form UnlistableStore to
66
# or make Stores throw UnlistableStore if their
67
# Transport doesn't support listing
68
# store_c = RemoteStore('http://example.com/')
69
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
72
store = self.get_store()
73
self.fill_store(store)
75
self.check_content(store, 'a', 'hello')
76
self.check_content(store, 'b', 'other')
77
self.check_content(store, 'c', 'something')
79
# Make sure that requesting a non-existing file fails
80
self.assertRaises(KeyError, self.check_content, store, 'd', None)
82
def test_multiple_add(self):
83
"""Multiple add with same ID should raise a BzrError"""
84
store = self.get_store()
85
self.fill_store(store)
86
self.assertRaises(BzrError, store.add, StringIO('goodbye'), '123123')
89
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
91
def get_store(self, path=u'.'):
92
t = transport.get_transport(path)
93
return TextStore(t, compressed=True)
95
def test_total_size(self):
96
store = self.get_store(u'.')
97
store.register_suffix('dsc')
98
store.add(StringIO('goodbye'), '123123')
99
store.add(StringIO('goodbye2'), '123123', 'dsc')
100
# these get gzipped - content should be stable
101
self.assertEqual(store.total_size(), (2, 55))
103
def test__relpath_suffixed(self):
104
my_store = TextStore(MockTransport(),
105
prefixed=True, compressed=True)
106
my_store.register_suffix('dsc')
107
self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
110
class TestMemoryStore(TestCase):
113
return TextStore(MemoryTransport())
115
def test_add_and_retrieve(self):
116
store = self.get_store()
117
store.add(StringIO('hello'), 'aa')
118
self.assertNotEqual(store.get('aa'), None)
119
self.assertEqual(store.get('aa').read(), 'hello')
120
store.add(StringIO('hello world'), 'bb')
121
self.assertNotEqual(store.get('bb'), None)
122
self.assertEqual(store.get('bb').read(), 'hello world')
124
def test_missing_is_absent(self):
125
store = self.get_store()
126
self.failIf('aa' in store)
128
def test_adding_fails_when_present(self):
129
my_store = self.get_store()
130
my_store.add(StringIO('hello'), 'aa')
131
self.assertRaises(BzrError,
132
my_store.add, StringIO('hello'), 'aa')
134
def test_total_size(self):
135
store = self.get_store()
136
store.add(StringIO('goodbye'), '123123')
137
store.add(StringIO('goodbye2'), '123123.dsc')
138
self.assertEqual(store.total_size(), (2, 15))
139
# TODO: Switch the exception form UnlistableStore to
140
# or make Stores throw UnlistableStore if their
141
# Transport doesn't support listing
142
# store_c = RemoteStore('http://example.com/')
143
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
146
class TestTextStore(TestCaseInTempDir, TestStores):
148
def get_store(self, path=u'.'):
149
t = transport.get_transport(path)
150
return TextStore(t, compressed=False)
152
def test_total_size(self):
153
store = self.get_store()
154
store.add(StringIO('goodbye'), '123123')
155
store.add(StringIO('goodbye2'), '123123.dsc')
156
self.assertEqual(store.total_size(), (2, 15))
157
# TODO: Switch the exception form UnlistableStore to
158
# or make Stores throw UnlistableStore if their
159
# Transport doesn't support listing
160
# store_c = RemoteStore('http://example.com/')
161
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
164
class TestMixedTextStore(TestCaseInTempDir, TestStores):
166
def get_store(self, path=u'.', compressed=True):
167
t = transport.get_transport(path)
168
return TextStore(t, compressed=compressed)
170
def test_get_mixed(self):
171
cs = self.get_store(u'.', compressed=True)
172
s = self.get_store(u'.', compressed=False)
173
cs.add(StringIO('hello there'), 'a')
175
self.failUnlessExists('a.gz')
176
self.failIf(os.path.lexists('a'))
178
self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
180
self.assertEquals(cs.has_id('a'), True)
181
self.assertEquals(s.has_id('a'), True)
182
self.assertEquals(cs.get('a').read(), 'hello there')
183
self.assertEquals(s.get('a').read(), 'hello there')
185
self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
187
s.add(StringIO('goodbye'), 'b')
188
self.failUnlessExists('b')
189
self.failIf(os.path.lexists('b.gz'))
190
self.assertEquals(open('b').read(), 'goodbye')
192
self.assertEquals(cs.has_id('b'), True)
193
self.assertEquals(s.has_id('b'), True)
194
self.assertEquals(cs.get('b').read(), 'goodbye')
195
self.assertEquals(s.get('b').read(), 'goodbye')
197
self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
199
class MockTransport(transport.Transport):
200
"""A fake transport for testing with."""
202
def has(self, filename):
205
def __init__(self, url=None):
207
url = "http://example.com"
208
super(MockTransport, self).__init__(url)
210
def mkdir(self, filename):
214
class InstrumentedTransportStore(store.TransportStore):
215
"""An instrumented TransportStore.
217
Here we replace template method worker methods with calls that record the
221
def _add(self, filename, file):
222
self._calls.append(("_add", filename, file))
224
def __init__(self, transport, prefixed=False):
225
super(InstrumentedTransportStore, self).__init__(transport, prefixed)
229
class TestInstrumentedTransportStore(TestCase):
231
def test__add_records(self):
232
my_store = InstrumentedTransportStore(MockTransport())
233
my_store._add("filename", "file")
234
self.assertEqual([("_add", "filename", "file")], my_store._calls)
237
class TestMockTransport(TestCase):
239
def test_isinstance(self):
240
self.failUnless(isinstance(MockTransport(), transport.Transport))
243
self.assertEqual(False, MockTransport().has('foo'))
245
def test_mkdir(self):
246
MockTransport().mkdir('45')
249
class TestTransportStore(TestCase):
251
def test__relpath_invalid(self):
252
my_store = store.TransportStore(MockTransport())
253
self.assertRaises(ValueError, my_store._relpath, '/foo')
254
self.assertRaises(ValueError, my_store._relpath, 'foo/')
256
def test_register_invalid_suffixes(self):
257
my_store = store.TransportStore(MockTransport())
258
self.assertRaises(ValueError, my_store.register_suffix, '/')
259
self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
261
def test__relpath_unregister_suffixes(self):
262
my_store = store.TransportStore(MockTransport())
263
self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
264
self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
266
def test__relpath_simple(self):
267
my_store = store.TransportStore(MockTransport())
268
self.assertEqual("foo", my_store._relpath('foo'))
270
def test__relpath_prefixed(self):
271
my_store = store.TransportStore(MockTransport(), True)
272
self.assertEqual('45/foo', my_store._relpath('foo'))
274
def test__relpath_simple_suffixed(self):
275
my_store = store.TransportStore(MockTransport())
276
my_store.register_suffix('bar')
277
my_store.register_suffix('baz')
278
self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
279
self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
281
def test__relpath_prefixed_suffixed(self):
282
my_store = store.TransportStore(MockTransport(), True)
283
my_store.register_suffix('bar')
284
my_store.register_suffix('baz')
285
self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
286
self.assertEqual('45/foo.bar.baz',
287
my_store._relpath('foo', ['bar', 'baz']))
289
def test_add_simple(self):
290
stream = StringIO("content")
291
my_store = InstrumentedTransportStore(MockTransport())
292
my_store.add(stream, "foo")
293
self.assertEqual([("_add", "foo", stream)], my_store._calls)
295
def test_add_prefixed(self):
296
stream = StringIO("content")
297
my_store = InstrumentedTransportStore(MockTransport(), True)
298
my_store.add(stream, "foo")
299
self.assertEqual([("_add", "45/foo", stream)], my_store._calls)
301
def test_add_simple_suffixed(self):
302
stream = StringIO("content")
303
my_store = InstrumentedTransportStore(MockTransport())
304
my_store.register_suffix('dsc')
305
my_store.add(stream, "foo", 'dsc')
306
self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
308
def test_add_simple_suffixed(self):
309
stream = StringIO("content")
310
my_store = InstrumentedTransportStore(MockTransport(), True)
311
my_store.register_suffix('dsc')
312
my_store.add(stream, "foo", 'dsc')
313
self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
315
def get_populated_store(self, prefixed=False,
316
store_class=TextStore, compressed=False):
317
my_store = store_class(MemoryTransport(), prefixed,
318
compressed=compressed)
319
my_store.register_suffix('sig')
320
stream = StringIO("signature")
321
my_store.add(stream, "foo", 'sig')
322
stream = StringIO("content")
323
my_store.add(stream, "foo")
324
stream = StringIO("signature for missing base")
325
my_store.add(stream, "missing", 'sig')
328
def test_has_simple(self):
329
my_store = self.get_populated_store()
330
self.assertEqual(True, my_store.has_id('foo'))
331
my_store = self.get_populated_store(True)
332
self.assertEqual(True, my_store.has_id('foo'))
334
def test_has_suffixed(self):
335
my_store = self.get_populated_store()
336
self.assertEqual(True, my_store.has_id('foo', 'sig'))
337
my_store = self.get_populated_store(True)
338
self.assertEqual(True, my_store.has_id('foo', 'sig'))
340
def test_has_suffixed_no_base(self):
341
my_store = self.get_populated_store()
342
self.assertEqual(False, my_store.has_id('missing'))
343
my_store = self.get_populated_store(True)
344
self.assertEqual(False, my_store.has_id('missing'))
346
def test_get_simple(self):
347
my_store = self.get_populated_store()
348
self.assertEqual('content', my_store.get('foo').read())
349
my_store = self.get_populated_store(True)
350
self.assertEqual('content', my_store.get('foo').read())
352
def test_get_suffixed(self):
353
my_store = self.get_populated_store()
354
self.assertEqual('signature', my_store.get('foo', 'sig').read())
355
my_store = self.get_populated_store(True)
356
self.assertEqual('signature', my_store.get('foo', 'sig').read())
358
def test_get_suffixed_no_base(self):
359
my_store = self.get_populated_store()
360
self.assertEqual('signature for missing base',
361
my_store.get('missing', 'sig').read())
362
my_store = self.get_populated_store(True)
363
self.assertEqual('signature for missing base',
364
my_store.get('missing', 'sig').read())
366
def test___iter__no_suffix(self):
367
my_store = TextStore(MemoryTransport(),
368
prefixed=False, compressed=False)
369
stream = StringIO("content")
370
my_store.add(stream, "foo")
371
self.assertEqual(set(['foo']),
372
set(my_store.__iter__()))
374
def test___iter__(self):
375
self.assertEqual(set(['foo']),
376
set(self.get_populated_store().__iter__()))
377
self.assertEqual(set(['foo']),
378
set(self.get_populated_store(True).__iter__()))
380
def test___iter__compressed(self):
381
self.assertEqual(set(['foo']),
382
set(self.get_populated_store(
383
compressed=True).__iter__()))
384
self.assertEqual(set(['foo']),
385
set(self.get_populated_store(
386
True, compressed=True).__iter__()))
388
def test___len__(self):
389
self.assertEqual(1, len(self.get_populated_store()))
391
def test_copy_suffixes(self):
392
from_store = self.get_populated_store()
393
to_store = TextStore(MemoryTransport(),
394
prefixed=True, compressed=True)
395
to_store.register_suffix('sig')
396
to_store.copy_all_ids(from_store)
397
self.assertEqual(1, len(to_store))
398
self.assertEqual(set(['foo']), set(to_store.__iter__()))
399
self.assertEqual('content', to_store.get('foo').read())
400
self.assertEqual('signature', to_store.get('foo', 'sig').read())
401
self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
403
def test_relpath_escaped(self):
404
my_store = store.TransportStore(MemoryTransport())
405
self.assertEqual('%25', my_store._relpath('%'))
407
def test_escaped_uppercase(self):
408
"""Uppercase letters are escaped for safety on Windows"""
409
my_store = store.TransportStore(MemoryTransport(), escaped=True)
410
# a particularly perverse file-id! :-)
411
self.assertEquals(my_store._escape_file_id('C:<>'), '%43%3a%3c%3e')
414
class TestVersionFileStore(TestCaseWithTransport):
417
super(TestVersionFileStore, self).setUp()
418
self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
420
def test_get_weave_registers_dirty_in_write(self):
421
transaction = transactions.WriteTransaction()
422
vf = self.vfstore.get_weave_or_empty('id', transaction)
424
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
425
transaction = transactions.WriteTransaction()
426
vf = self.vfstore.get_weave('id', transaction)
428
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
430
def test_get_weave_or_empty_readonly_fails(self):
431
transaction = transactions.ReadOnlyTransaction()
432
vf = self.assertRaises(errors.ReadOnlyError,
433
self.vfstore.get_weave_or_empty,
437
def test_get_weave_readonly_cant_write(self):
438
transaction = transactions.WriteTransaction()
439
vf = self.vfstore.get_weave_or_empty('id', transaction)
441
transaction = transactions.ReadOnlyTransaction()
442
vf = self.vfstore.get_weave_or_empty('id', transaction)
443
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])