~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/teststore.py

and the tutorial patch came back, the very next day

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
"""Test Store implementations."""
18
 
 
19
 
from cStringIO import StringIO
20
 
import os
21
 
import gzip
22
 
 
23
 
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
24
 
from bzrlib.store import copy_all
25
 
from bzrlib.transport.local import LocalTransport
26
 
from bzrlib.store.text import TextStore
27
 
from bzrlib.tests import TestCase, TestCaseInTempDir
28
 
import bzrlib.store as store
29
 
import bzrlib.transport as transport
30
 
from bzrlib.transport.memory import MemoryTransport
31
 
 
32
 
 
33
 
class TestStores(object):
34
 
    """Mixin template class that provides some common tests for stores"""
35
 
 
36
 
    def check_content(self, store, fileid, value):
37
 
        f = store.get(fileid)
38
 
        self.assertEqual(f.read(), value)
39
 
 
40
 
    def fill_store(self, store):
41
 
        store.add(StringIO('hello'), 'a')
42
 
        store.add(StringIO('other'), 'b')
43
 
        store.add(StringIO('something'), 'c')
44
 
        store.add(StringIO('goodbye'), '123123')
45
 
 
46
 
    def test_copy_all(self):
47
 
        """Test copying"""
48
 
        os.mkdir('a')
49
 
        store_a = self.get_store('a')
50
 
        store_a.add('foo', '1')
51
 
        os.mkdir('b')
52
 
        store_b = self.get_store('b')
53
 
        copy_all(store_a, store_b)
54
 
        self.assertEqual(store_a.get('1').read(), 'foo')
55
 
        self.assertEqual(store_b.get('1').read(), 'foo')
56
 
        # TODO: Switch the exception form UnlistableStore to
57
 
        #       or make Stores throw UnlistableStore if their
58
 
        #       Transport doesn't support listing
59
 
        # store_c = RemoteStore('http://example.com/')
60
 
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
61
 
 
62
 
    def test_get(self):
63
 
        store = self.get_store()
64
 
        self.fill_store(store)
65
 
    
66
 
        self.check_content(store, 'a', 'hello')
67
 
        self.check_content(store, 'b', 'other')
68
 
        self.check_content(store, 'c', 'something')
69
 
    
70
 
        # Make sure that requesting a non-existing file fails
71
 
        self.assertRaises(KeyError, self.check_content, store, 'd', None)
72
 
 
 
17
"""Test Store implementation
 
18
"""
 
19
from bzrlib.store import ImmutableStore
 
20
from bzrlib.selftest import TestCaseInTempDir
 
21
from StringIO import StringIO
 
22
from bzrlib.errors import BzrError
 
23
 
 
24
class TestStore(TestCaseInTempDir):
73
25
    def test_multiple_add(self):
74
26
        """Multiple add with same ID should raise a BzrError"""
75
 
        store = self.get_store()
76
 
        self.fill_store(store)
 
27
        store = ImmutableStore('.')
 
28
        store.add(StringIO('goodbye'), '123123')
77
29
        self.assertRaises(BzrError, store.add, StringIO('goodbye'), '123123')
78
30
 
79
 
 
80
 
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
81
 
 
82
 
    def get_store(self, path=u'.'):
83
 
        t = LocalTransport(path)
84
 
        return TextStore(t, compressed=True)
85
 
 
86
 
    def test_total_size(self):
87
 
        store = self.get_store(u'.')
88
 
        store.register_suffix('dsc')
89
 
        store.add(StringIO('goodbye'), '123123')
90
 
        store.add(StringIO('goodbye2'), '123123', 'dsc')
91
 
        # these get gzipped - content should be stable
92
 
        self.assertEqual(store.total_size(), (2, 55))
93
 
        
94
 
    def test__relpath_suffixed(self):
95
 
        my_store = TextStore(MockTransport(),
96
 
                             prefixed=True, compressed=True)
97
 
        my_store.register_suffix('dsc')
98
 
        self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
99
 
 
100
 
 
101
 
class TestMemoryStore(TestCase):
102
 
    
103
 
    def get_store(self):
104
 
        return store.ImmutableMemoryStore()
105
 
    
106
 
    def test_imports(self):
107
 
        from bzrlib.store import ImmutableMemoryStore
108
 
 
109
 
    def test_add_and_retrieve(self):
110
 
        store = self.get_store()
111
 
        store.add(StringIO('hello'), 'aa')
112
 
        self.assertNotEqual(store.get('aa'), None)
113
 
        self.assertEqual(store.get('aa').read(), 'hello')
114
 
        store.add(StringIO('hello world'), 'bb')
115
 
        self.assertNotEqual(store.get('bb'), None)
116
 
        self.assertEqual(store.get('bb').read(), 'hello world')
117
 
 
118
 
    def test_missing_is_absent(self):
119
 
        store = self.get_store()
120
 
        self.failIf('aa' in store)
121
 
 
122
 
    def test_adding_fails_when_present(self):
123
 
        my_store = self.get_store()
124
 
        my_store.add(StringIO('hello'), 'aa')
125
 
        self.assertRaises(BzrError,
126
 
                          my_store.add, StringIO('hello'), 'aa')
127
 
 
128
 
    def test_total_size(self):
129
 
        store = self.get_store()
130
 
        store.add(StringIO('goodbye'), '123123')
131
 
        store.add(StringIO('goodbye2'), '123123.dsc')
132
 
        self.assertEqual(store.total_size(), (2, 15))
133
 
        # TODO: Switch the exception form UnlistableStore to
134
 
        #       or make Stores throw UnlistableStore if their
135
 
        #       Transport doesn't support listing
136
 
        # store_c = RemoteStore('http://example.com/')
137
 
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
138
 
 
139
 
 
140
 
class TestTextStore(TestCaseInTempDir, TestStores):
141
 
 
142
 
    def get_store(self, path=u'.'):
143
 
        t = LocalTransport(path)
144
 
        return TextStore(t, compressed=False)
145
 
 
146
 
    def test_total_size(self):
147
 
        store = self.get_store()
148
 
        store.add(StringIO('goodbye'), '123123')
149
 
        store.add(StringIO('goodbye2'), '123123.dsc')
150
 
        self.assertEqual(store.total_size(), (2, 15))
151
 
        # TODO: Switch the exception form UnlistableStore to
152
 
        #       or make Stores throw UnlistableStore if their
153
 
        #       Transport doesn't support listing
154
 
        # store_c = RemoteStore('http://example.com/')
155
 
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
156
 
 
157
 
 
158
 
class TestMixedTextStore(TestCaseInTempDir, TestStores):
159
 
 
160
 
    def get_store(self, path=u'.', compressed=True):
161
 
        t = LocalTransport(path)
162
 
        return TextStore(t, compressed=compressed)
163
 
 
164
 
    def test_get_mixed(self):
165
 
        cs = self.get_store(u'.', compressed=True)
166
 
        s = self.get_store(u'.', compressed=False)
167
 
        cs.add(StringIO('hello there'), 'a')
168
 
 
169
 
        self.failUnlessExists('a.gz')
170
 
        self.failIf(os.path.lexists('a'))
171
 
 
172
 
        self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
173
 
 
174
 
        self.assertEquals(cs.has_id('a'), True)
175
 
        self.assertEquals(s.has_id('a'), True)
176
 
        self.assertEquals(cs.get('a').read(), 'hello there')
177
 
        self.assertEquals(s.get('a').read(), 'hello there')
178
 
        
179
 
        self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
180
 
 
181
 
        s.add(StringIO('goodbye'), 'b')
182
 
        self.failUnlessExists('b')
183
 
        self.failIf(os.path.lexists('b.gz'))
184
 
        self.assertEquals(open('b').read(), 'goodbye')
185
 
 
186
 
        self.assertEquals(cs.has_id('b'), True)
187
 
        self.assertEquals(s.has_id('b'), True)
188
 
        self.assertEquals(cs.get('b').read(), 'goodbye')
189
 
        self.assertEquals(s.get('b').read(), 'goodbye')
190
 
        
191
 
        self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
192
 
 
193
 
class MockTransport(transport.Transport):
194
 
    """A fake transport for testing with."""
195
 
 
196
 
    def has(self, filename):
197
 
        return False
198
 
 
199
 
    def __init__(self, url=None):
200
 
        if url is None:
201
 
            url = "http://example.com"
202
 
        super(MockTransport, self).__init__(url)
203
 
 
204
 
    def mkdir(self, filename):
205
 
        return
206
 
 
207
 
 
208
 
class InstrumentedTransportStore(store.TransportStore):
209
 
    """An instrumented TransportStore.
210
 
 
211
 
    Here we replace template method worker methods with calls that record the
212
 
    expected results.
213
 
    """
214
 
 
215
 
    def _add(self, filename, file):
216
 
        self._calls.append(("_add", filename, file))
217
 
 
218
 
    def __init__(self, transport, prefixed=False):
219
 
        super(InstrumentedTransportStore, self).__init__(transport, prefixed)
220
 
        self._calls = []
221
 
 
222
 
 
223
 
class TestInstrumentedTransportStore(TestCase):
224
 
 
225
 
    def test__add_records(self):
226
 
        my_store = InstrumentedTransportStore(MockTransport())
227
 
        my_store._add("filename", "file")
228
 
        self.assertEqual([("_add", "filename", "file")], my_store._calls)
229
 
 
230
 
 
231
 
class TestMockTransport(TestCase):
232
 
 
233
 
    def test_isinstance(self):
234
 
        self.failUnless(isinstance(MockTransport(), transport.Transport))
235
 
 
236
 
    def test_has(self):
237
 
        self.assertEqual(False, MockTransport().has('foo'))
238
 
 
239
 
    def test_mkdir(self):
240
 
        MockTransport().mkdir('45')
241
 
 
242
 
 
243
 
class TestTransportStore(TestCase):
244
 
    
245
 
    def test__relpath_invalid(self):
246
 
        my_store = store.TransportStore(MockTransport())
247
 
        self.assertRaises(ValueError, my_store._relpath, '/foo')
248
 
        self.assertRaises(ValueError, my_store._relpath, 'foo/')
249
 
 
250
 
    def test_register_invalid_suffixes(self):
251
 
        my_store = store.TransportStore(MockTransport())
252
 
        self.assertRaises(ValueError, my_store.register_suffix, '/')
253
 
        self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
254
 
 
255
 
    def test__relpath_unregister_suffixes(self):
256
 
        my_store = store.TransportStore(MockTransport())
257
 
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
258
 
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
259
 
 
260
 
    def test__relpath_simple(self):
261
 
        my_store = store.TransportStore(MockTransport())
262
 
        self.assertEqual("foo", my_store._relpath('foo'))
263
 
 
264
 
    def test__relpath_prefixed(self):
265
 
        my_store = store.TransportStore(MockTransport(), True)
266
 
        self.assertEqual('45/foo', my_store._relpath('foo'))
267
 
 
268
 
    def test__relpath_simple_suffixed(self):
269
 
        my_store = store.TransportStore(MockTransport())
270
 
        my_store.register_suffix('bar')
271
 
        my_store.register_suffix('baz')
272
 
        self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
273
 
        self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
274
 
 
275
 
    def test__relpath_prefixed_suffixed(self):
276
 
        my_store = store.TransportStore(MockTransport(), True)
277
 
        my_store.register_suffix('bar')
278
 
        my_store.register_suffix('baz')
279
 
        self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
280
 
        self.assertEqual('45/foo.bar.baz',
281
 
                         my_store._relpath('foo', ['bar', 'baz']))
282
 
 
283
 
    def test_add_simple(self):
284
 
        stream = StringIO("content")
285
 
        my_store = InstrumentedTransportStore(MockTransport())
286
 
        my_store.add(stream, "foo")
287
 
        self.assertEqual([("_add", "foo", stream)], my_store._calls)
288
 
 
289
 
    def test_add_prefixed(self):
290
 
        stream = StringIO("content")
291
 
        my_store = InstrumentedTransportStore(MockTransport(), True)
292
 
        my_store.add(stream, "foo")
293
 
        self.assertEqual([("_add", "45/foo", stream)], my_store._calls)
294
 
 
295
 
    def test_add_simple_suffixed(self):
296
 
        stream = StringIO("content")
297
 
        my_store = InstrumentedTransportStore(MockTransport())
298
 
        my_store.register_suffix('dsc')
299
 
        my_store.add(stream, "foo", 'dsc')
300
 
        self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
301
 
        
302
 
    def test_add_simple_suffixed(self):
303
 
        stream = StringIO("content")
304
 
        my_store = InstrumentedTransportStore(MockTransport(), True)
305
 
        my_store.register_suffix('dsc')
306
 
        my_store.add(stream, "foo", 'dsc')
307
 
        self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
308
 
 
309
 
    def get_populated_store(self, prefixed=False,
310
 
            store_class=TextStore, compressed=False):
311
 
        my_store = store_class(MemoryTransport(), prefixed,
312
 
                               compressed=compressed)
313
 
        my_store.register_suffix('sig')
314
 
        stream = StringIO("signature")
315
 
        my_store.add(stream, "foo", 'sig')
316
 
        stream = StringIO("content")
317
 
        my_store.add(stream, "foo")
318
 
        stream = StringIO("signature for missing base")
319
 
        my_store.add(stream, "missing", 'sig')
320
 
        return my_store
321
 
        
322
 
    def test_has_simple(self):
323
 
        my_store = self.get_populated_store()
324
 
        self.assertEqual(True, my_store.has_id('foo'))
325
 
        my_store = self.get_populated_store(True)
326
 
        self.assertEqual(True, my_store.has_id('foo'))
327
 
 
328
 
    def test_has_suffixed(self):
329
 
        my_store = self.get_populated_store()
330
 
        self.assertEqual(True, my_store.has_id('foo', 'sig'))
331
 
        my_store = self.get_populated_store(True)
332
 
        self.assertEqual(True, my_store.has_id('foo', 'sig'))
333
 
 
334
 
    def test_has_suffixed_no_base(self):
335
 
        my_store = self.get_populated_store()
336
 
        self.assertEqual(False, my_store.has_id('missing'))
337
 
        my_store = self.get_populated_store(True)
338
 
        self.assertEqual(False, my_store.has_id('missing'))
339
 
 
340
 
    def test_get_simple(self):
341
 
        my_store = self.get_populated_store()
342
 
        self.assertEqual('content', my_store.get('foo').read())
343
 
        my_store = self.get_populated_store(True)
344
 
        self.assertEqual('content', my_store.get('foo').read())
345
 
 
346
 
    def test_get_suffixed(self):
347
 
        my_store = self.get_populated_store()
348
 
        self.assertEqual('signature', my_store.get('foo', 'sig').read())
349
 
        my_store = self.get_populated_store(True)
350
 
        self.assertEqual('signature', my_store.get('foo', 'sig').read())
351
 
 
352
 
    def test_get_suffixed_no_base(self):
353
 
        my_store = self.get_populated_store()
354
 
        self.assertEqual('signature for missing base',
355
 
                         my_store.get('missing', 'sig').read())
356
 
        my_store = self.get_populated_store(True)
357
 
        self.assertEqual('signature for missing base',
358
 
                         my_store.get('missing', 'sig').read())
359
 
 
360
 
    def test___iter__no_suffix(self):
361
 
        my_store = TextStore(MemoryTransport(),
362
 
                             prefixed=False, compressed=False)
363
 
        stream = StringIO("content")
364
 
        my_store.add(stream, "foo")
365
 
        self.assertEqual(set(['foo']),
366
 
                         set(my_store.__iter__()))
367
 
 
368
 
    def test___iter__(self):
369
 
        self.assertEqual(set(['foo']),
370
 
                         set(self.get_populated_store().__iter__()))
371
 
        self.assertEqual(set(['foo']),
372
 
                         set(self.get_populated_store(True).__iter__()))
373
 
 
374
 
    def test___iter__compressed(self):
375
 
        self.assertEqual(set(['foo']),
376
 
                         set(self.get_populated_store(
377
 
                             compressed=True).__iter__()))
378
 
        self.assertEqual(set(['foo']),
379
 
                         set(self.get_populated_store(
380
 
                             True, compressed=True).__iter__()))
381
 
 
382
 
    def test___len__(self):
383
 
        self.assertEqual(1, len(self.get_populated_store()))
384
 
 
385
 
    def test_copy_suffixes(self):
386
 
        from_store = self.get_populated_store()
387
 
        to_store = TextStore(MemoryTransport(),
388
 
                             prefixed=True, compressed=True)
389
 
        to_store.register_suffix('sig')
390
 
        copy_all(from_store, to_store)
391
 
        self.assertEqual(1, len(to_store))
392
 
        self.assertEqual(set(['foo']), set(to_store.__iter__()))
393
 
        self.assertEqual('content', to_store.get('foo').read())
394
 
        self.assertEqual('signature', to_store.get('foo', 'sig').read())
395
 
        self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
396
 
 
397
 
    def test_relpath_escaped(self):
398
 
        my_store = store.TransportStore(MemoryTransport())
399
 
        self.assertEqual('%25', my_store._relpath('%'))
 
31
TEST_CLASSES = [
 
32
    TestStore,
 
33
    ]