~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_store.py

[merge] John, sftp and others

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 by Canonical Development Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Test Store implementations."""
 
18
 
 
19
from cStringIO import StringIO
 
20
import os
 
21
import gzip
 
22
 
 
23
from bzrlib.errors import BzrError, UnlistableStore
 
24
from bzrlib.store import copy_all
 
25
from bzrlib.transport.local import LocalTransport
 
26
from bzrlib.transport import NoSuchFile
 
27
from bzrlib.store.text import TextStore
 
28
from bzrlib.tests import TestCase, TestCaseInTempDir
 
29
import bzrlib.store as store
 
30
import bzrlib.transport as transport
 
31
from bzrlib.transport.memory import MemoryTransport
 
32
 
 
33
 
 
34
class TestStores(object):
 
35
    """Mixin template class that provides some common tests for stores"""
 
36
 
 
37
    def check_content(self, store, fileid, value):
 
38
        f = store.get(fileid)
 
39
        self.assertEqual(f.read(), value)
 
40
 
 
41
    def fill_store(self, store):
 
42
        store.add(StringIO('hello'), 'a')
 
43
        store.add(StringIO('other'), 'b')
 
44
        store.add(StringIO('something'), 'c')
 
45
        store.add(StringIO('goodbye'), '123123')
 
46
 
 
47
    def test_copy_all(self):
 
48
        """Test copying"""
 
49
        os.mkdir('a')
 
50
        store_a = self.get_store('a')
 
51
        store_a.add('foo', '1')
 
52
        os.mkdir('b')
 
53
        store_b = self.get_store('b')
 
54
        copy_all(store_a, store_b)
 
55
        self.assertEqual(store_a.get('1').read(), 'foo')
 
56
        self.assertEqual(store_b.get('1').read(), 'foo')
 
57
        # TODO: Switch the exception form UnlistableStore to
 
58
        #       or make Stores throw UnlistableStore if their
 
59
        #       Transport doesn't support listing
 
60
        # store_c = RemoteStore('http://example.com/')
 
61
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
 
62
 
 
63
    def test_get(self):
 
64
        store = self.get_store()
 
65
        self.fill_store(store)
 
66
    
 
67
        self.check_content(store, 'a', 'hello')
 
68
        self.check_content(store, 'b', 'other')
 
69
        self.check_content(store, 'c', 'something')
 
70
    
 
71
        # Make sure that requesting a non-existing file fails
 
72
        self.assertRaises(KeyError, self.check_content, store, 'd', None)
 
73
 
 
74
    def test_multiple_add(self):
 
75
        """Multiple add with same ID should raise a BzrError"""
 
76
        store = self.get_store()
 
77
        self.fill_store(store)
 
78
        self.assertRaises(BzrError, store.add, StringIO('goodbye'), '123123')
 
79
 
 
80
 
 
81
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
 
82
 
 
83
    def get_store(self, path=u'.'):
 
84
        t = LocalTransport(path)
 
85
        return TextStore(t, compressed=True)
 
86
 
 
87
    def test_total_size(self):
 
88
        store = self.get_store(u'.')
 
89
        store.register_suffix('dsc')
 
90
        store.add(StringIO('goodbye'), '123123')
 
91
        store.add(StringIO('goodbye2'), '123123', 'dsc')
 
92
        # these get gzipped - content should be stable
 
93
        self.assertEqual(store.total_size(), (2, 55))
 
94
        
 
95
    def test__relpath_suffixed(self):
 
96
        my_store = TextStore(MockTransport(), True, compressed=True)
 
97
        my_store.register_suffix('dsc')
 
98
        self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
 
99
 
 
100
 
 
101
class TestMemoryStore(TestCase):
 
102
    
 
103
    def get_store(self):
 
104
        return store.ImmutableMemoryStore()
 
105
    
 
106
    def test_imports(self):
 
107
        from bzrlib.store import ImmutableMemoryStore
 
108
 
 
109
    def test_add_and_retrieve(self):
 
110
        store = self.get_store()
 
111
        store.add(StringIO('hello'), 'aa')
 
112
        self.assertNotEqual(store.get('aa'), None)
 
113
        self.assertEqual(store.get('aa').read(), 'hello')
 
114
        store.add(StringIO('hello world'), 'bb')
 
115
        self.assertNotEqual(store.get('bb'), None)
 
116
        self.assertEqual(store.get('bb').read(), 'hello world')
 
117
 
 
118
    def test_missing_is_absent(self):
 
119
        store = self.get_store()
 
120
        self.failIf('aa' in store)
 
121
 
 
122
    def test_adding_fails_when_present(self):
 
123
        my_store = self.get_store()
 
124
        my_store.add(StringIO('hello'), 'aa')
 
125
        self.assertRaises(BzrError,
 
126
                          my_store.add, StringIO('hello'), 'aa')
 
127
 
 
128
    def test_total_size(self):
 
129
        store = self.get_store()
 
130
        store.add(StringIO('goodbye'), '123123')
 
131
        store.add(StringIO('goodbye2'), '123123.dsc')
 
132
        self.assertEqual(store.total_size(), (2, 15))
 
133
        # TODO: Switch the exception form UnlistableStore to
 
134
        #       or make Stores throw UnlistableStore if their
 
135
        #       Transport doesn't support listing
 
136
        # store_c = RemoteStore('http://example.com/')
 
137
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
 
138
 
 
139
 
 
140
class TestTextStore(TestCaseInTempDir, TestStores):
 
141
 
 
142
    def get_store(self, path=u'.'):
 
143
        t = LocalTransport(path)
 
144
        return TextStore(t, compressed=False)
 
145
 
 
146
    def test_total_size(self):
 
147
        store = self.get_store()
 
148
        store.add(StringIO('goodbye'), '123123')
 
149
        store.add(StringIO('goodbye2'), '123123.dsc')
 
150
        self.assertEqual(store.total_size(), (2, 15))
 
151
        # TODO: Switch the exception form UnlistableStore to
 
152
        #       or make Stores throw UnlistableStore if their
 
153
        #       Transport doesn't support listing
 
154
        # store_c = RemoteStore('http://example.com/')
 
155
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
 
156
 
 
157
 
 
158
class TestMixedTextStore(TestCaseInTempDir, TestStores):
 
159
 
 
160
    def get_store(self, path=u'.', compressed=True):
 
161
        t = LocalTransport(path)
 
162
        return TextStore(t, compressed=compressed)
 
163
 
 
164
    def test_get_mixed(self):
 
165
        cs = self.get_store(u'.', compressed=True)
 
166
        s = self.get_store(u'.', compressed=False)
 
167
        cs.add(StringIO('hello there'), 'a')
 
168
 
 
169
        self.failUnlessExists('a.gz')
 
170
        self.failIf(os.path.lexists('a'))
 
171
 
 
172
        self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
 
173
 
 
174
        self.assertEquals(cs.has_id('a'), True)
 
175
        self.assertEquals(s.has_id('a'), True)
 
176
        self.assertEquals(cs.get('a').read(), 'hello there')
 
177
        self.assertEquals(s.get('a').read(), 'hello there')
 
178
        
 
179
        self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
 
180
 
 
181
        s.add(StringIO('goodbye'), 'b')
 
182
        self.failUnlessExists('b')
 
183
        self.failIf(os.path.lexists('b.gz'))
 
184
        self.assertEquals(open('b').read(), 'goodbye')
 
185
 
 
186
        self.assertEquals(cs.has_id('b'), True)
 
187
        self.assertEquals(s.has_id('b'), True)
 
188
        self.assertEquals(cs.get('b').read(), 'goodbye')
 
189
        self.assertEquals(s.get('b').read(), 'goodbye')
 
190
        
 
191
        self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
 
192
 
 
193
class MockTransport(transport.Transport):
 
194
    """A fake transport for testing with."""
 
195
 
 
196
    def has(self, filename):
 
197
        return False
 
198
 
 
199
    def __init__(self, url=None):
 
200
        if url is None:
 
201
            url = "http://example.com"
 
202
        super(MockTransport, self).__init__(url)
 
203
 
 
204
    def mkdir(self, filename):
 
205
        return
 
206
 
 
207
 
 
208
class InstrumentedTransportStore(store.TransportStore):
 
209
    """An instrumented TransportStore.
 
210
 
 
211
    Here we replace template method worker methods with calls that record the
 
212
    expected results.
 
213
    """
 
214
 
 
215
    def _add(self, filename, file):
 
216
        self._calls.append(("_add", filename, file))
 
217
 
 
218
    def __init__(self, transport, prefixed=False):
 
219
        super(InstrumentedTransportStore, self).__init__(transport, prefixed)
 
220
        self._calls = []
 
221
 
 
222
 
 
223
class TestInstrumentedTransportStore(TestCase):
 
224
 
 
225
    def test__add_records(self):
 
226
        my_store = InstrumentedTransportStore(MockTransport())
 
227
        my_store._add("filename", "file")
 
228
        self.assertEqual([("_add", "filename", "file")], my_store._calls)
 
229
 
 
230
 
 
231
class TestMockTransport(TestCase):
 
232
 
 
233
    def test_isinstance(self):
 
234
        self.failUnless(isinstance(MockTransport(), transport.Transport))
 
235
 
 
236
    def test_has(self):
 
237
        self.assertEqual(False, MockTransport().has('foo'))
 
238
 
 
239
    def test_mkdir(self):
 
240
        MockTransport().mkdir('45')
 
241
 
 
242
 
 
243
class TestTransportStore(TestCase):
 
244
    
 
245
    def test__relpath_invalid(self):
 
246
        my_store = store.TransportStore(MockTransport())
 
247
        self.assertRaises(ValueError, my_store._relpath, '/foo')
 
248
        self.assertRaises(ValueError, my_store._relpath, 'foo/')
 
249
 
 
250
    def test_register_invalid_suffixes(self):
 
251
        my_store = store.TransportStore(MockTransport())
 
252
        self.assertRaises(ValueError, my_store.register_suffix, '/')
 
253
        self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
 
254
 
 
255
    def test__relpath_unregister_suffixes(self):
 
256
        my_store = store.TransportStore(MockTransport())
 
257
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
 
258
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
 
259
 
 
260
    def test__relpath_simple(self):
 
261
        my_store = store.TransportStore(MockTransport())
 
262
        self.assertEqual("foo", my_store._relpath('foo'))
 
263
 
 
264
    def test__relpath_prefixed(self):
 
265
        my_store = store.TransportStore(MockTransport(), True)
 
266
        self.assertEqual('45/foo', my_store._relpath('foo'))
 
267
 
 
268
    def test__relpath_simple_suffixed(self):
 
269
        my_store = store.TransportStore(MockTransport())
 
270
        my_store.register_suffix('bar')
 
271
        my_store.register_suffix('baz')
 
272
        self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
 
273
        self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
 
274
 
 
275
    def test__relpath_prefixed_suffixed(self):
 
276
        my_store = store.TransportStore(MockTransport(), True)
 
277
        my_store.register_suffix('bar')
 
278
        my_store.register_suffix('baz')
 
279
        self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
 
280
        self.assertEqual('45/foo.bar.baz',
 
281
                         my_store._relpath('foo', ['bar', 'baz']))
 
282
 
 
283
    def test_add_simple(self):
 
284
        stream = StringIO("content")
 
285
        my_store = InstrumentedTransportStore(MockTransport())
 
286
        my_store.add(stream, "foo")
 
287
        self.assertEqual([("_add", "foo", stream)], my_store._calls)
 
288
 
 
289
    def test_add_prefixed(self):
 
290
        stream = StringIO("content")
 
291
        my_store = InstrumentedTransportStore(MockTransport(), True)
 
292
        my_store.add(stream, "foo")
 
293
        self.assertEqual([("_add", "45/foo", stream)], my_store._calls)
 
294
 
 
295
    def test_add_simple_suffixed(self):
 
296
        stream = StringIO("content")
 
297
        my_store = InstrumentedTransportStore(MockTransport())
 
298
        my_store.register_suffix('dsc')
 
299
        my_store.add(stream, "foo", 'dsc')
 
300
        self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
 
301
        
 
302
    def test_add_simple_suffixed(self):
 
303
        stream = StringIO("content")
 
304
        my_store = InstrumentedTransportStore(MockTransport(), True)
 
305
        my_store.register_suffix('dsc')
 
306
        my_store.add(stream, "foo", 'dsc')
 
307
        self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
 
308
 
 
309
    def get_populated_store(self, prefixed=False,
 
310
            store_class=TextStore, compressed=False):
 
311
        my_store = store_class(MemoryTransport(), prefixed,
 
312
                               compressed=compressed)
 
313
        my_store.register_suffix('sig')
 
314
        stream = StringIO("signature")
 
315
        my_store.add(stream, "foo", 'sig')
 
316
        stream = StringIO("content")
 
317
        my_store.add(stream, "foo")
 
318
        stream = StringIO("signature for missing base")
 
319
        my_store.add(stream, "missing", 'sig')
 
320
        return my_store
 
321
        
 
322
    def test_has_simple(self):
 
323
        my_store = self.get_populated_store()
 
324
        self.assertEqual(True, my_store.has_id('foo'))
 
325
        my_store = self.get_populated_store(True)
 
326
        self.assertEqual(True, my_store.has_id('foo'))
 
327
 
 
328
    def test_has_suffixed(self):
 
329
        my_store = self.get_populated_store()
 
330
        self.assertEqual(True, my_store.has_id('foo', 'sig'))
 
331
        my_store = self.get_populated_store(True)
 
332
        self.assertEqual(True, my_store.has_id('foo', 'sig'))
 
333
 
 
334
    def test_has_suffixed_no_base(self):
 
335
        my_store = self.get_populated_store()
 
336
        self.assertEqual(False, my_store.has_id('missing'))
 
337
        my_store = self.get_populated_store(True)
 
338
        self.assertEqual(False, my_store.has_id('missing'))
 
339
 
 
340
    def test_get_simple(self):
 
341
        my_store = self.get_populated_store()
 
342
        self.assertEqual('content', my_store.get('foo').read())
 
343
        my_store = self.get_populated_store(True)
 
344
        self.assertEqual('content', my_store.get('foo').read())
 
345
 
 
346
    def test_get_suffixed(self):
 
347
        my_store = self.get_populated_store()
 
348
        self.assertEqual('signature', my_store.get('foo', 'sig').read())
 
349
        my_store = self.get_populated_store(True)
 
350
        self.assertEqual('signature', my_store.get('foo', 'sig').read())
 
351
 
 
352
    def test_get_suffixed_no_base(self):
 
353
        my_store = self.get_populated_store()
 
354
        self.assertEqual('signature for missing base',
 
355
                         my_store.get('missing', 'sig').read())
 
356
        my_store = self.get_populated_store(True)
 
357
        self.assertEqual('signature for missing base',
 
358
                         my_store.get('missing', 'sig').read())
 
359
 
 
360
    def test___iter__no_suffix(self):
 
361
        my_store = TextStore(MemoryTransport(), False, compressed=False)
 
362
        stream = StringIO("content")
 
363
        my_store.add(stream, "foo")
 
364
        self.assertEqual(set(['foo']),
 
365
                         set(my_store.__iter__()))
 
366
 
 
367
    def test___iter__(self):
 
368
        self.assertEqual(set(['foo']),
 
369
                         set(self.get_populated_store().__iter__()))
 
370
        self.assertEqual(set(['foo']),
 
371
                         set(self.get_populated_store(True).__iter__()))
 
372
 
 
373
    def test___iter__compressed(self):
 
374
        self.assertEqual(set(['foo']),
 
375
                         set(self.get_populated_store(
 
376
                             compressed=True).__iter__()))
 
377
        self.assertEqual(set(['foo']),
 
378
                         set(self.get_populated_store(
 
379
                             True, compressed=True).__iter__()))
 
380
 
 
381
    def test___len__(self):
 
382
        self.assertEqual(1, len(self.get_populated_store()))
 
383
 
 
384
    def test_copy_suffixes(self):
 
385
        from_store = self.get_populated_store()
 
386
        to_store = TextStore(MemoryTransport(), True, compressed=True)
 
387
        to_store.register_suffix('sig')
 
388
        copy_all(from_store, to_store)
 
389
        self.assertEqual(1, len(to_store))
 
390
        self.assertEqual(set(['foo']), set(to_store.__iter__()))
 
391
        self.assertEqual('content', to_store.get('foo').read())
 
392
        self.assertEqual('signature', to_store.get('foo', 'sig').read())
 
393
        self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
 
394
 
 
395
    def test_relpath_escaped(self):
 
396
        my_store = store.TransportStore(MemoryTransport())
 
397
        self.assertEqual('%25', my_store._relpath('%'))