~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_store.py

  • Committer: Martin Pool
  • Date: 2005-04-15 03:01:33 UTC
  • Revision ID: mbp@sourcefrog.net-20050415030133-0ce77498bbab063f
- cleanup

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
"""Test Store implementations."""
18
 
 
19
 
from cStringIO import StringIO
20
 
import os
21
 
import gzip
22
 
 
23
 
import bzrlib.errors as errors
24
 
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
25
 
from bzrlib.transport.local import LocalTransport
26
 
from bzrlib.store.text import TextStore
27
 
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
28
 
import bzrlib.store as store
29
 
import bzrlib.transactions as transactions
30
 
import bzrlib.transport as transport
31
 
from bzrlib.transport.memory import MemoryTransport
32
 
 
33
 
 
34
 
class TestStores(object):
35
 
    """Mixin template class that provides some common tests for stores"""
36
 
 
37
 
    def check_content(self, store, fileid, value):
38
 
        f = store.get(fileid)
39
 
        self.assertEqual(f.read(), value)
40
 
 
41
 
    def test_add_str_deprecated(self):
42
 
        os.mkdir('a')
43
 
        store = self.get_store('a')
44
 
        self.callDeprecated(['Passing a string to Store.add'
45
 
                             ' was deprecated in version 0.11.'],
46
 
                            store.add, 'foo', '1')
47
 
        self.assertEqual('foo', store.get('1').read())
48
 
 
49
 
    def fill_store(self, store):
50
 
        store.add(StringIO('hello'), 'a')
51
 
        store.add(StringIO('other'), 'b')
52
 
        store.add(StringIO('something'), 'c')
53
 
        store.add(StringIO('goodbye'), '123123')
54
 
 
55
 
    def test_copy_all(self):
56
 
        """Test copying"""
57
 
        os.mkdir('a')
58
 
        store_a = self.get_store('a')
59
 
        store_a.add(StringIO('foo'), '1')
60
 
        os.mkdir('b')
61
 
        store_b = self.get_store('b')
62
 
        store_b.copy_all_ids(store_a)
63
 
        self.assertEqual(store_a.get('1').read(), 'foo')
64
 
        self.assertEqual(store_b.get('1').read(), 'foo')
65
 
        # TODO: Switch the exception form UnlistableStore to
66
 
        #       or make Stores throw UnlistableStore if their
67
 
        #       Transport doesn't support listing
68
 
        # store_c = RemoteStore('http://example.com/')
69
 
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
70
 
 
71
 
    def test_get(self):
72
 
        store = self.get_store()
73
 
        self.fill_store(store)
74
 
    
75
 
        self.check_content(store, 'a', 'hello')
76
 
        self.check_content(store, 'b', 'other')
77
 
        self.check_content(store, 'c', 'something')
78
 
    
79
 
        # Make sure that requesting a non-existing file fails
80
 
        self.assertRaises(KeyError, self.check_content, store, 'd', None)
81
 
 
82
 
    def test_multiple_add(self):
83
 
        """Multiple add with same ID should raise a BzrError"""
84
 
        store = self.get_store()
85
 
        self.fill_store(store)
86
 
        self.assertRaises(BzrError, store.add, StringIO('goodbye'), '123123')
87
 
 
88
 
 
89
 
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
90
 
 
91
 
    def get_store(self, path=u'.'):
92
 
        t = transport.get_transport(path)
93
 
        return TextStore(t, compressed=True)
94
 
 
95
 
    def test_total_size(self):
96
 
        store = self.get_store(u'.')
97
 
        store.register_suffix('dsc')
98
 
        store.add(StringIO('goodbye'), '123123')
99
 
        store.add(StringIO('goodbye2'), '123123', 'dsc')
100
 
        # these get gzipped - content should be stable
101
 
        self.assertEqual(store.total_size(), (2, 55))
102
 
        
103
 
    def test__relpath_suffixed(self):
104
 
        my_store = TextStore(MockTransport(),
105
 
                             prefixed=True, compressed=True)
106
 
        my_store.register_suffix('dsc')
107
 
        self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
108
 
 
109
 
 
110
 
class TestMemoryStore(TestCase):
111
 
    
112
 
    def get_store(self):
113
 
        return TextStore(MemoryTransport())
114
 
    
115
 
    def test_add_and_retrieve(self):
116
 
        store = self.get_store()
117
 
        store.add(StringIO('hello'), 'aa')
118
 
        self.assertNotEqual(store.get('aa'), None)
119
 
        self.assertEqual(store.get('aa').read(), 'hello')
120
 
        store.add(StringIO('hello world'), 'bb')
121
 
        self.assertNotEqual(store.get('bb'), None)
122
 
        self.assertEqual(store.get('bb').read(), 'hello world')
123
 
 
124
 
    def test_missing_is_absent(self):
125
 
        store = self.get_store()
126
 
        self.failIf('aa' in store)
127
 
 
128
 
    def test_adding_fails_when_present(self):
129
 
        my_store = self.get_store()
130
 
        my_store.add(StringIO('hello'), 'aa')
131
 
        self.assertRaises(BzrError,
132
 
                          my_store.add, StringIO('hello'), 'aa')
133
 
 
134
 
    def test_total_size(self):
135
 
        store = self.get_store()
136
 
        store.add(StringIO('goodbye'), '123123')
137
 
        store.add(StringIO('goodbye2'), '123123.dsc')
138
 
        self.assertEqual(store.total_size(), (2, 15))
139
 
        # TODO: Switch the exception form UnlistableStore to
140
 
        #       or make Stores throw UnlistableStore if their
141
 
        #       Transport doesn't support listing
142
 
        # store_c = RemoteStore('http://example.com/')
143
 
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
144
 
 
145
 
 
146
 
class TestTextStore(TestCaseInTempDir, TestStores):
147
 
 
148
 
    def get_store(self, path=u'.'):
149
 
        t = transport.get_transport(path)
150
 
        return TextStore(t, compressed=False)
151
 
 
152
 
    def test_total_size(self):
153
 
        store = self.get_store()
154
 
        store.add(StringIO('goodbye'), '123123')
155
 
        store.add(StringIO('goodbye2'), '123123.dsc')
156
 
        self.assertEqual(store.total_size(), (2, 15))
157
 
        # TODO: Switch the exception form UnlistableStore to
158
 
        #       or make Stores throw UnlistableStore if their
159
 
        #       Transport doesn't support listing
160
 
        # store_c = RemoteStore('http://example.com/')
161
 
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
162
 
 
163
 
 
164
 
class TestMixedTextStore(TestCaseInTempDir, TestStores):
165
 
 
166
 
    def get_store(self, path=u'.', compressed=True):
167
 
        t = transport.get_transport(path)
168
 
        return TextStore(t, compressed=compressed)
169
 
 
170
 
    def test_get_mixed(self):
171
 
        cs = self.get_store(u'.', compressed=True)
172
 
        s = self.get_store(u'.', compressed=False)
173
 
        cs.add(StringIO('hello there'), 'a')
174
 
 
175
 
        self.failUnlessExists('a.gz')
176
 
        self.failIf(os.path.lexists('a'))
177
 
 
178
 
        self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
179
 
 
180
 
        self.assertEquals(cs.has_id('a'), True)
181
 
        self.assertEquals(s.has_id('a'), True)
182
 
        self.assertEquals(cs.get('a').read(), 'hello there')
183
 
        self.assertEquals(s.get('a').read(), 'hello there')
184
 
        
185
 
        self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
186
 
 
187
 
        s.add(StringIO('goodbye'), 'b')
188
 
        self.failUnlessExists('b')
189
 
        self.failIf(os.path.lexists('b.gz'))
190
 
        self.assertEquals(open('b').read(), 'goodbye')
191
 
 
192
 
        self.assertEquals(cs.has_id('b'), True)
193
 
        self.assertEquals(s.has_id('b'), True)
194
 
        self.assertEquals(cs.get('b').read(), 'goodbye')
195
 
        self.assertEquals(s.get('b').read(), 'goodbye')
196
 
        
197
 
        self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
198
 
 
199
 
class MockTransport(transport.Transport):
200
 
    """A fake transport for testing with."""
201
 
 
202
 
    def has(self, filename):
203
 
        return False
204
 
 
205
 
    def __init__(self, url=None):
206
 
        if url is None:
207
 
            url = "http://example.com"
208
 
        super(MockTransport, self).__init__(url)
209
 
 
210
 
    def mkdir(self, filename):
211
 
        return
212
 
 
213
 
 
214
 
class InstrumentedTransportStore(store.TransportStore):
215
 
    """An instrumented TransportStore.
216
 
 
217
 
    Here we replace template method worker methods with calls that record the
218
 
    expected results.
219
 
    """
220
 
 
221
 
    def _add(self, filename, file):
222
 
        self._calls.append(("_add", filename, file))
223
 
 
224
 
    def __init__(self, transport, prefixed=False):
225
 
        super(InstrumentedTransportStore, self).__init__(transport, prefixed)
226
 
        self._calls = []
227
 
 
228
 
 
229
 
class TestInstrumentedTransportStore(TestCase):
230
 
 
231
 
    def test__add_records(self):
232
 
        my_store = InstrumentedTransportStore(MockTransport())
233
 
        my_store._add("filename", "file")
234
 
        self.assertEqual([("_add", "filename", "file")], my_store._calls)
235
 
 
236
 
 
237
 
class TestMockTransport(TestCase):
238
 
 
239
 
    def test_isinstance(self):
240
 
        self.failUnless(isinstance(MockTransport(), transport.Transport))
241
 
 
242
 
    def test_has(self):
243
 
        self.assertEqual(False, MockTransport().has('foo'))
244
 
 
245
 
    def test_mkdir(self):
246
 
        MockTransport().mkdir('45')
247
 
 
248
 
 
249
 
class TestTransportStore(TestCase):
250
 
    
251
 
    def test__relpath_invalid(self):
252
 
        my_store = store.TransportStore(MockTransport())
253
 
        self.assertRaises(ValueError, my_store._relpath, '/foo')
254
 
        self.assertRaises(ValueError, my_store._relpath, 'foo/')
255
 
 
256
 
    def test_register_invalid_suffixes(self):
257
 
        my_store = store.TransportStore(MockTransport())
258
 
        self.assertRaises(ValueError, my_store.register_suffix, '/')
259
 
        self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
260
 
 
261
 
    def test__relpath_unregister_suffixes(self):
262
 
        my_store = store.TransportStore(MockTransport())
263
 
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
264
 
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
265
 
 
266
 
    def test__relpath_simple(self):
267
 
        my_store = store.TransportStore(MockTransport())
268
 
        self.assertEqual("foo", my_store._relpath('foo'))
269
 
 
270
 
    def test__relpath_prefixed(self):
271
 
        my_store = store.TransportStore(MockTransport(), True)
272
 
        self.assertEqual('45/foo', my_store._relpath('foo'))
273
 
 
274
 
    def test__relpath_simple_suffixed(self):
275
 
        my_store = store.TransportStore(MockTransport())
276
 
        my_store.register_suffix('bar')
277
 
        my_store.register_suffix('baz')
278
 
        self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
279
 
        self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
280
 
 
281
 
    def test__relpath_prefixed_suffixed(self):
282
 
        my_store = store.TransportStore(MockTransport(), True)
283
 
        my_store.register_suffix('bar')
284
 
        my_store.register_suffix('baz')
285
 
        self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
286
 
        self.assertEqual('45/foo.bar.baz',
287
 
                         my_store._relpath('foo', ['bar', 'baz']))
288
 
 
289
 
    def test_add_simple(self):
290
 
        stream = StringIO("content")
291
 
        my_store = InstrumentedTransportStore(MockTransport())
292
 
        my_store.add(stream, "foo")
293
 
        self.assertEqual([("_add", "foo", stream)], my_store._calls)
294
 
 
295
 
    def test_add_prefixed(self):
296
 
        stream = StringIO("content")
297
 
        my_store = InstrumentedTransportStore(MockTransport(), True)
298
 
        my_store.add(stream, "foo")
299
 
        self.assertEqual([("_add", "45/foo", stream)], my_store._calls)
300
 
 
301
 
    def test_add_simple_suffixed(self):
302
 
        stream = StringIO("content")
303
 
        my_store = InstrumentedTransportStore(MockTransport())
304
 
        my_store.register_suffix('dsc')
305
 
        my_store.add(stream, "foo", 'dsc')
306
 
        self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
307
 
        
308
 
    def test_add_simple_suffixed(self):
309
 
        stream = StringIO("content")
310
 
        my_store = InstrumentedTransportStore(MockTransport(), True)
311
 
        my_store.register_suffix('dsc')
312
 
        my_store.add(stream, "foo", 'dsc')
313
 
        self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
314
 
 
315
 
    def get_populated_store(self, prefixed=False,
316
 
            store_class=TextStore, compressed=False):
317
 
        my_store = store_class(MemoryTransport(), prefixed,
318
 
                               compressed=compressed)
319
 
        my_store.register_suffix('sig')
320
 
        stream = StringIO("signature")
321
 
        my_store.add(stream, "foo", 'sig')
322
 
        stream = StringIO("content")
323
 
        my_store.add(stream, "foo")
324
 
        stream = StringIO("signature for missing base")
325
 
        my_store.add(stream, "missing", 'sig')
326
 
        return my_store
327
 
        
328
 
    def test_has_simple(self):
329
 
        my_store = self.get_populated_store()
330
 
        self.assertEqual(True, my_store.has_id('foo'))
331
 
        my_store = self.get_populated_store(True)
332
 
        self.assertEqual(True, my_store.has_id('foo'))
333
 
 
334
 
    def test_has_suffixed(self):
335
 
        my_store = self.get_populated_store()
336
 
        self.assertEqual(True, my_store.has_id('foo', 'sig'))
337
 
        my_store = self.get_populated_store(True)
338
 
        self.assertEqual(True, my_store.has_id('foo', 'sig'))
339
 
 
340
 
    def test_has_suffixed_no_base(self):
341
 
        my_store = self.get_populated_store()
342
 
        self.assertEqual(False, my_store.has_id('missing'))
343
 
        my_store = self.get_populated_store(True)
344
 
        self.assertEqual(False, my_store.has_id('missing'))
345
 
 
346
 
    def test_get_simple(self):
347
 
        my_store = self.get_populated_store()
348
 
        self.assertEqual('content', my_store.get('foo').read())
349
 
        my_store = self.get_populated_store(True)
350
 
        self.assertEqual('content', my_store.get('foo').read())
351
 
 
352
 
    def test_get_suffixed(self):
353
 
        my_store = self.get_populated_store()
354
 
        self.assertEqual('signature', my_store.get('foo', 'sig').read())
355
 
        my_store = self.get_populated_store(True)
356
 
        self.assertEqual('signature', my_store.get('foo', 'sig').read())
357
 
 
358
 
    def test_get_suffixed_no_base(self):
359
 
        my_store = self.get_populated_store()
360
 
        self.assertEqual('signature for missing base',
361
 
                         my_store.get('missing', 'sig').read())
362
 
        my_store = self.get_populated_store(True)
363
 
        self.assertEqual('signature for missing base',
364
 
                         my_store.get('missing', 'sig').read())
365
 
 
366
 
    def test___iter__no_suffix(self):
367
 
        my_store = TextStore(MemoryTransport(),
368
 
                             prefixed=False, compressed=False)
369
 
        stream = StringIO("content")
370
 
        my_store.add(stream, "foo")
371
 
        self.assertEqual(set(['foo']),
372
 
                         set(my_store.__iter__()))
373
 
 
374
 
    def test___iter__(self):
375
 
        self.assertEqual(set(['foo']),
376
 
                         set(self.get_populated_store().__iter__()))
377
 
        self.assertEqual(set(['foo']),
378
 
                         set(self.get_populated_store(True).__iter__()))
379
 
 
380
 
    def test___iter__compressed(self):
381
 
        self.assertEqual(set(['foo']),
382
 
                         set(self.get_populated_store(
383
 
                             compressed=True).__iter__()))
384
 
        self.assertEqual(set(['foo']),
385
 
                         set(self.get_populated_store(
386
 
                             True, compressed=True).__iter__()))
387
 
 
388
 
    def test___len__(self):
389
 
        self.assertEqual(1, len(self.get_populated_store()))
390
 
 
391
 
    def test_copy_suffixes(self):
392
 
        from_store = self.get_populated_store()
393
 
        to_store = TextStore(MemoryTransport(),
394
 
                             prefixed=True, compressed=True)
395
 
        to_store.register_suffix('sig')
396
 
        to_store.copy_all_ids(from_store)
397
 
        self.assertEqual(1, len(to_store))
398
 
        self.assertEqual(set(['foo']), set(to_store.__iter__()))
399
 
        self.assertEqual('content', to_store.get('foo').read())
400
 
        self.assertEqual('signature', to_store.get('foo', 'sig').read())
401
 
        self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
402
 
 
403
 
    def test_relpath_escaped(self):
404
 
        my_store = store.TransportStore(MemoryTransport())
405
 
        self.assertEqual('%25', my_store._relpath('%'))
406
 
 
407
 
    def test_escaped_uppercase(self):
408
 
        """Uppercase letters are escaped for safety on Windows"""
409
 
        my_store = store.TransportStore(MemoryTransport(), escaped=True)
410
 
        # a particularly perverse file-id! :-)
411
 
        self.assertEquals(my_store._escape_file_id('C:<>'), '%43%3a%3c%3e')
412
 
 
413
 
 
414
 
class TestVersionFileStore(TestCaseWithTransport):
415
 
 
416
 
    def setUp(self):
417
 
        super(TestVersionFileStore, self).setUp()
418
 
        self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
419
 
 
420
 
    def test_get_weave_registers_dirty_in_write(self):
421
 
        transaction = transactions.WriteTransaction()
422
 
        vf = self.vfstore.get_weave_or_empty('id', transaction)
423
 
        transaction.finish()
424
 
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
425
 
        transaction = transactions.WriteTransaction()
426
 
        vf = self.vfstore.get_weave('id', transaction)
427
 
        transaction.finish()
428
 
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
429
 
 
430
 
    def test_get_weave_or_empty_readonly_fails(self):
431
 
        transaction = transactions.ReadOnlyTransaction()
432
 
        vf = self.assertRaises(errors.ReadOnlyError,
433
 
                               self.vfstore.get_weave_or_empty,
434
 
                               'id',
435
 
                               transaction)
436
 
 
437
 
    def test_get_weave_readonly_cant_write(self):
438
 
        transaction = transactions.WriteTransaction()
439
 
        vf = self.vfstore.get_weave_or_empty('id', transaction)
440
 
        transaction.finish()
441
 
        transaction = transactions.ReadOnlyTransaction()
442
 
        vf = self.vfstore.get_weave_or_empty('id', transaction)
443
 
        self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
444