~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_store.py

  • Committer: Martin Pool
  • Date: 2005-08-18 07:51:58 UTC
  • Revision ID: mbp@sourcefrog.net-20050818075157-5f69075fa843d558
- add space to store revision-id in weave files

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Development Ltd
2
 
 
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
 
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
 
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
"""Test Store implementations."""
18
 
 
19
 
from cStringIO import StringIO
20
 
import os
21
 
import gzip
22
 
 
23
 
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
24
 
from bzrlib.store import copy_all
25
 
from bzrlib.transport.local import LocalTransport
26
 
from bzrlib.store.text import TextStore
27
 
from bzrlib.tests import TestCase, TestCaseInTempDir
28
 
import bzrlib.store as store
29
 
import bzrlib.transport as transport
30
 
from bzrlib.transport.memory import MemoryTransport
31
 
 
32
 
 
33
 
class TestStores(object):
34
 
    """Mixin template class that provides some common tests for stores"""
35
 
 
36
 
    def check_content(self, store, fileid, value):
37
 
        f = store.get(fileid)
38
 
        self.assertEqual(f.read(), value)
39
 
 
40
 
    def fill_store(self, store):
41
 
        store.add(StringIO('hello'), 'a')
42
 
        store.add(StringIO('other'), 'b')
43
 
        store.add(StringIO('something'), 'c')
44
 
        store.add(StringIO('goodbye'), '123123')
45
 
 
46
 
    def test_copy_all(self):
47
 
        """Test copying"""
48
 
        os.mkdir('a')
49
 
        store_a = self.get_store('a')
50
 
        store_a.add('foo', '1')
51
 
        os.mkdir('b')
52
 
        store_b = self.get_store('b')
53
 
        copy_all(store_a, store_b)
54
 
        self.assertEqual(store_a.get('1').read(), 'foo')
55
 
        self.assertEqual(store_b.get('1').read(), 'foo')
56
 
        # TODO: Switch the exception form UnlistableStore to
57
 
        #       or make Stores throw UnlistableStore if their
58
 
        #       Transport doesn't support listing
59
 
        # store_c = RemoteStore('http://example.com/')
60
 
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
61
 
 
62
 
    def test_get(self):
63
 
        store = self.get_store()
64
 
        self.fill_store(store)
65
 
    
66
 
        self.check_content(store, 'a', 'hello')
67
 
        self.check_content(store, 'b', 'other')
68
 
        self.check_content(store, 'c', 'something')
69
 
    
70
 
        # Make sure that requesting a non-existing file fails
71
 
        self.assertRaises(KeyError, self.check_content, store, 'd', None)
72
 
 
73
 
    def test_multiple_add(self):
74
 
        """Multiple add with same ID should raise a BzrError"""
75
 
        store = self.get_store()
76
 
        self.fill_store(store)
77
 
        self.assertRaises(BzrError, store.add, StringIO('goodbye'), '123123')
78
 
 
79
 
 
80
 
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
81
 
 
82
 
    def get_store(self, path=u'.'):
83
 
        t = LocalTransport(path)
84
 
        return TextStore(t, compressed=True)
85
 
 
86
 
    def test_total_size(self):
87
 
        store = self.get_store(u'.')
88
 
        store.register_suffix('dsc')
89
 
        store.add(StringIO('goodbye'), '123123')
90
 
        store.add(StringIO('goodbye2'), '123123', 'dsc')
91
 
        # these get gzipped - content should be stable
92
 
        self.assertEqual(store.total_size(), (2, 55))
93
 
        
94
 
    def test__relpath_suffixed(self):
95
 
        my_store = TextStore(MockTransport(), True, compressed=True)
96
 
        my_store.register_suffix('dsc')
97
 
        self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
98
 
 
99
 
 
100
 
class TestMemoryStore(TestCase):
101
 
    
102
 
    def get_store(self):
103
 
        return store.ImmutableMemoryStore()
104
 
    
105
 
    def test_imports(self):
106
 
        from bzrlib.store import ImmutableMemoryStore
107
 
 
108
 
    def test_add_and_retrieve(self):
109
 
        store = self.get_store()
110
 
        store.add(StringIO('hello'), 'aa')
111
 
        self.assertNotEqual(store.get('aa'), None)
112
 
        self.assertEqual(store.get('aa').read(), 'hello')
113
 
        store.add(StringIO('hello world'), 'bb')
114
 
        self.assertNotEqual(store.get('bb'), None)
115
 
        self.assertEqual(store.get('bb').read(), 'hello world')
116
 
 
117
 
    def test_missing_is_absent(self):
118
 
        store = self.get_store()
119
 
        self.failIf('aa' in store)
120
 
 
121
 
    def test_adding_fails_when_present(self):
122
 
        my_store = self.get_store()
123
 
        my_store.add(StringIO('hello'), 'aa')
124
 
        self.assertRaises(BzrError,
125
 
                          my_store.add, StringIO('hello'), 'aa')
126
 
 
127
 
    def test_total_size(self):
128
 
        store = self.get_store()
129
 
        store.add(StringIO('goodbye'), '123123')
130
 
        store.add(StringIO('goodbye2'), '123123.dsc')
131
 
        self.assertEqual(store.total_size(), (2, 15))
132
 
        # TODO: Switch the exception form UnlistableStore to
133
 
        #       or make Stores throw UnlistableStore if their
134
 
        #       Transport doesn't support listing
135
 
        # store_c = RemoteStore('http://example.com/')
136
 
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
137
 
 
138
 
 
139
 
class TestTextStore(TestCaseInTempDir, TestStores):
140
 
 
141
 
    def get_store(self, path=u'.'):
142
 
        t = LocalTransport(path)
143
 
        return TextStore(t, compressed=False)
144
 
 
145
 
    def test_total_size(self):
146
 
        store = self.get_store()
147
 
        store.add(StringIO('goodbye'), '123123')
148
 
        store.add(StringIO('goodbye2'), '123123.dsc')
149
 
        self.assertEqual(store.total_size(), (2, 15))
150
 
        # TODO: Switch the exception form UnlistableStore to
151
 
        #       or make Stores throw UnlistableStore if their
152
 
        #       Transport doesn't support listing
153
 
        # store_c = RemoteStore('http://example.com/')
154
 
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
155
 
 
156
 
 
157
 
class TestMixedTextStore(TestCaseInTempDir, TestStores):
158
 
 
159
 
    def get_store(self, path=u'.', compressed=True):
160
 
        t = LocalTransport(path)
161
 
        return TextStore(t, compressed=compressed)
162
 
 
163
 
    def test_get_mixed(self):
164
 
        cs = self.get_store(u'.', compressed=True)
165
 
        s = self.get_store(u'.', compressed=False)
166
 
        cs.add(StringIO('hello there'), 'a')
167
 
 
168
 
        self.failUnlessExists('a.gz')
169
 
        self.failIf(os.path.lexists('a'))
170
 
 
171
 
        self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
172
 
 
173
 
        self.assertEquals(cs.has_id('a'), True)
174
 
        self.assertEquals(s.has_id('a'), True)
175
 
        self.assertEquals(cs.get('a').read(), 'hello there')
176
 
        self.assertEquals(s.get('a').read(), 'hello there')
177
 
        
178
 
        self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
179
 
 
180
 
        s.add(StringIO('goodbye'), 'b')
181
 
        self.failUnlessExists('b')
182
 
        self.failIf(os.path.lexists('b.gz'))
183
 
        self.assertEquals(open('b').read(), 'goodbye')
184
 
 
185
 
        self.assertEquals(cs.has_id('b'), True)
186
 
        self.assertEquals(s.has_id('b'), True)
187
 
        self.assertEquals(cs.get('b').read(), 'goodbye')
188
 
        self.assertEquals(s.get('b').read(), 'goodbye')
189
 
        
190
 
        self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
191
 
 
192
 
class MockTransport(transport.Transport):
193
 
    """A fake transport for testing with."""
194
 
 
195
 
    def has(self, filename):
196
 
        return False
197
 
 
198
 
    def __init__(self, url=None):
199
 
        if url is None:
200
 
            url = "http://example.com"
201
 
        super(MockTransport, self).__init__(url)
202
 
 
203
 
    def mkdir(self, filename):
204
 
        return
205
 
 
206
 
 
207
 
class InstrumentedTransportStore(store.TransportStore):
208
 
    """An instrumented TransportStore.
209
 
 
210
 
    Here we replace template method worker methods with calls that record the
211
 
    expected results.
212
 
    """
213
 
 
214
 
    def _add(self, filename, file):
215
 
        self._calls.append(("_add", filename, file))
216
 
 
217
 
    def __init__(self, transport, prefixed=False):
218
 
        super(InstrumentedTransportStore, self).__init__(transport, prefixed)
219
 
        self._calls = []
220
 
 
221
 
 
222
 
class TestInstrumentedTransportStore(TestCase):
223
 
 
224
 
    def test__add_records(self):
225
 
        my_store = InstrumentedTransportStore(MockTransport())
226
 
        my_store._add("filename", "file")
227
 
        self.assertEqual([("_add", "filename", "file")], my_store._calls)
228
 
 
229
 
 
230
 
class TestMockTransport(TestCase):
231
 
 
232
 
    def test_isinstance(self):
233
 
        self.failUnless(isinstance(MockTransport(), transport.Transport))
234
 
 
235
 
    def test_has(self):
236
 
        self.assertEqual(False, MockTransport().has('foo'))
237
 
 
238
 
    def test_mkdir(self):
239
 
        MockTransport().mkdir('45')
240
 
 
241
 
 
242
 
class TestTransportStore(TestCase):
243
 
    
244
 
    def test__relpath_invalid(self):
245
 
        my_store = store.TransportStore(MockTransport())
246
 
        self.assertRaises(ValueError, my_store._relpath, '/foo')
247
 
        self.assertRaises(ValueError, my_store._relpath, 'foo/')
248
 
 
249
 
    def test_register_invalid_suffixes(self):
250
 
        my_store = store.TransportStore(MockTransport())
251
 
        self.assertRaises(ValueError, my_store.register_suffix, '/')
252
 
        self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
253
 
 
254
 
    def test__relpath_unregister_suffixes(self):
255
 
        my_store = store.TransportStore(MockTransport())
256
 
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
257
 
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
258
 
 
259
 
    def test__relpath_simple(self):
260
 
        my_store = store.TransportStore(MockTransport())
261
 
        self.assertEqual("foo", my_store._relpath('foo'))
262
 
 
263
 
    def test__relpath_prefixed(self):
264
 
        my_store = store.TransportStore(MockTransport(), True)
265
 
        self.assertEqual('45/foo', my_store._relpath('foo'))
266
 
 
267
 
    def test__relpath_simple_suffixed(self):
268
 
        my_store = store.TransportStore(MockTransport())
269
 
        my_store.register_suffix('bar')
270
 
        my_store.register_suffix('baz')
271
 
        self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
272
 
        self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
273
 
 
274
 
    def test__relpath_prefixed_suffixed(self):
275
 
        my_store = store.TransportStore(MockTransport(), True)
276
 
        my_store.register_suffix('bar')
277
 
        my_store.register_suffix('baz')
278
 
        self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
279
 
        self.assertEqual('45/foo.bar.baz',
280
 
                         my_store._relpath('foo', ['bar', 'baz']))
281
 
 
282
 
    def test_add_simple(self):
283
 
        stream = StringIO("content")
284
 
        my_store = InstrumentedTransportStore(MockTransport())
285
 
        my_store.add(stream, "foo")
286
 
        self.assertEqual([("_add", "foo", stream)], my_store._calls)
287
 
 
288
 
    def test_add_prefixed(self):
289
 
        stream = StringIO("content")
290
 
        my_store = InstrumentedTransportStore(MockTransport(), True)
291
 
        my_store.add(stream, "foo")
292
 
        self.assertEqual([("_add", "45/foo", stream)], my_store._calls)
293
 
 
294
 
    def test_add_simple_suffixed(self):
295
 
        stream = StringIO("content")
296
 
        my_store = InstrumentedTransportStore(MockTransport())
297
 
        my_store.register_suffix('dsc')
298
 
        my_store.add(stream, "foo", 'dsc')
299
 
        self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
300
 
        
301
 
    def test_add_simple_suffixed(self):
302
 
        stream = StringIO("content")
303
 
        my_store = InstrumentedTransportStore(MockTransport(), True)
304
 
        my_store.register_suffix('dsc')
305
 
        my_store.add(stream, "foo", 'dsc')
306
 
        self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
307
 
 
308
 
    def get_populated_store(self, prefixed=False,
309
 
            store_class=TextStore, compressed=False):
310
 
        my_store = store_class(MemoryTransport(), prefixed,
311
 
                               compressed=compressed)
312
 
        my_store.register_suffix('sig')
313
 
        stream = StringIO("signature")
314
 
        my_store.add(stream, "foo", 'sig')
315
 
        stream = StringIO("content")
316
 
        my_store.add(stream, "foo")
317
 
        stream = StringIO("signature for missing base")
318
 
        my_store.add(stream, "missing", 'sig')
319
 
        return my_store
320
 
        
321
 
    def test_has_simple(self):
322
 
        my_store = self.get_populated_store()
323
 
        self.assertEqual(True, my_store.has_id('foo'))
324
 
        my_store = self.get_populated_store(True)
325
 
        self.assertEqual(True, my_store.has_id('foo'))
326
 
 
327
 
    def test_has_suffixed(self):
328
 
        my_store = self.get_populated_store()
329
 
        self.assertEqual(True, my_store.has_id('foo', 'sig'))
330
 
        my_store = self.get_populated_store(True)
331
 
        self.assertEqual(True, my_store.has_id('foo', 'sig'))
332
 
 
333
 
    def test_has_suffixed_no_base(self):
334
 
        my_store = self.get_populated_store()
335
 
        self.assertEqual(False, my_store.has_id('missing'))
336
 
        my_store = self.get_populated_store(True)
337
 
        self.assertEqual(False, my_store.has_id('missing'))
338
 
 
339
 
    def test_get_simple(self):
340
 
        my_store = self.get_populated_store()
341
 
        self.assertEqual('content', my_store.get('foo').read())
342
 
        my_store = self.get_populated_store(True)
343
 
        self.assertEqual('content', my_store.get('foo').read())
344
 
 
345
 
    def test_get_suffixed(self):
346
 
        my_store = self.get_populated_store()
347
 
        self.assertEqual('signature', my_store.get('foo', 'sig').read())
348
 
        my_store = self.get_populated_store(True)
349
 
        self.assertEqual('signature', my_store.get('foo', 'sig').read())
350
 
 
351
 
    def test_get_suffixed_no_base(self):
352
 
        my_store = self.get_populated_store()
353
 
        self.assertEqual('signature for missing base',
354
 
                         my_store.get('missing', 'sig').read())
355
 
        my_store = self.get_populated_store(True)
356
 
        self.assertEqual('signature for missing base',
357
 
                         my_store.get('missing', 'sig').read())
358
 
 
359
 
    def test___iter__no_suffix(self):
360
 
        my_store = TextStore(MemoryTransport(), False, compressed=False)
361
 
        stream = StringIO("content")
362
 
        my_store.add(stream, "foo")
363
 
        self.assertEqual(set(['foo']),
364
 
                         set(my_store.__iter__()))
365
 
 
366
 
    def test___iter__(self):
367
 
        self.assertEqual(set(['foo']),
368
 
                         set(self.get_populated_store().__iter__()))
369
 
        self.assertEqual(set(['foo']),
370
 
                         set(self.get_populated_store(True).__iter__()))
371
 
 
372
 
    def test___iter__compressed(self):
373
 
        self.assertEqual(set(['foo']),
374
 
                         set(self.get_populated_store(
375
 
                             compressed=True).__iter__()))
376
 
        self.assertEqual(set(['foo']),
377
 
                         set(self.get_populated_store(
378
 
                             True, compressed=True).__iter__()))
379
 
 
380
 
    def test___len__(self):
381
 
        self.assertEqual(1, len(self.get_populated_store()))
382
 
 
383
 
    def test_copy_suffixes(self):
384
 
        from_store = self.get_populated_store()
385
 
        to_store = TextStore(MemoryTransport(), True, compressed=True)
386
 
        to_store.register_suffix('sig')
387
 
        copy_all(from_store, to_store)
388
 
        self.assertEqual(1, len(to_store))
389
 
        self.assertEqual(set(['foo']), set(to_store.__iter__()))
390
 
        self.assertEqual('content', to_store.get('foo').read())
391
 
        self.assertEqual('signature', to_store.get('foo', 'sig').read())
392
 
        self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
393
 
 
394
 
    def test_relpath_escaped(self):
395
 
        my_store = store.TransportStore(MemoryTransport())
396
 
        self.assertEqual('%25', my_store._relpath('%'))