~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_store.py

  • Committer: Martin Pool
  • Date: 2005-05-09 03:03:55 UTC
  • Revision ID: mbp@sourcefrog.net-20050509030355-ad6ab558d1362959
- Don't give an error if the trace file can't be opened

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Development Ltd
2
 
 
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
 
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
 
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
"""Test Store implementations."""
18
 
 
19
 
from cStringIO import StringIO
20
 
import os
21
 
import gzip
22
 
 
23
 
import bzrlib.errors as errors
24
 
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
25
 
from bzrlib.transport.local import LocalTransport
26
 
from bzrlib.store.text import TextStore
27
 
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
28
 
import bzrlib.store as store
29
 
import bzrlib.transactions as transactions
30
 
import bzrlib.transport as transport
31
 
from bzrlib.transport.memory import MemoryTransport
32
 
 
33
 
 
34
 
class TestStores(object):
35
 
    """Mixin template class that provides some common tests for stores"""
36
 
 
37
 
    def check_content(self, store, fileid, value):
38
 
        f = store.get(fileid)
39
 
        self.assertEqual(f.read(), value)
40
 
 
41
 
    def fill_store(self, store):
42
 
        store.add(StringIO('hello'), 'a')
43
 
        store.add(StringIO('other'), 'b')
44
 
        store.add(StringIO('something'), 'c')
45
 
        store.add(StringIO('goodbye'), '123123')
46
 
 
47
 
    def test_copy_all(self):
48
 
        """Test copying"""
49
 
        os.mkdir('a')
50
 
        store_a = self.get_store('a')
51
 
        store_a.add('foo', '1')
52
 
        os.mkdir('b')
53
 
        store_b = self.get_store('b')
54
 
        store_b.copy_all_ids(store_a)
55
 
        self.assertEqual(store_a.get('1').read(), 'foo')
56
 
        self.assertEqual(store_b.get('1').read(), 'foo')
57
 
        # TODO: Switch the exception form UnlistableStore to
58
 
        #       or make Stores throw UnlistableStore if their
59
 
        #       Transport doesn't support listing
60
 
        # store_c = RemoteStore('http://example.com/')
61
 
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
62
 
 
63
 
    def test_get(self):
64
 
        store = self.get_store()
65
 
        self.fill_store(store)
66
 
    
67
 
        self.check_content(store, 'a', 'hello')
68
 
        self.check_content(store, 'b', 'other')
69
 
        self.check_content(store, 'c', 'something')
70
 
    
71
 
        # Make sure that requesting a non-existing file fails
72
 
        self.assertRaises(KeyError, self.check_content, store, 'd', None)
73
 
 
74
 
    def test_multiple_add(self):
75
 
        """Multiple add with same ID should raise a BzrError"""
76
 
        store = self.get_store()
77
 
        self.fill_store(store)
78
 
        self.assertRaises(BzrError, store.add, StringIO('goodbye'), '123123')
79
 
 
80
 
 
81
 
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
82
 
 
83
 
    def get_store(self, path=u'.'):
84
 
        t = LocalTransport(path)
85
 
        return TextStore(t, compressed=True)
86
 
 
87
 
    def test_total_size(self):
88
 
        store = self.get_store(u'.')
89
 
        store.register_suffix('dsc')
90
 
        store.add(StringIO('goodbye'), '123123')
91
 
        store.add(StringIO('goodbye2'), '123123', 'dsc')
92
 
        # these get gzipped - content should be stable
93
 
        self.assertEqual(store.total_size(), (2, 55))
94
 
        
95
 
    def test__relpath_suffixed(self):
96
 
        my_store = TextStore(MockTransport(),
97
 
                             prefixed=True, compressed=True)
98
 
        my_store.register_suffix('dsc')
99
 
        self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
100
 
 
101
 
 
102
 
class TestMemoryStore(TestCase):
103
 
    
104
 
    def get_store(self):
105
 
        return store.ImmutableMemoryStore()
106
 
    
107
 
    def test_imports(self):
108
 
        from bzrlib.store import ImmutableMemoryStore
109
 
 
110
 
    def test_add_and_retrieve(self):
111
 
        store = self.get_store()
112
 
        store.add(StringIO('hello'), 'aa')
113
 
        self.assertNotEqual(store.get('aa'), None)
114
 
        self.assertEqual(store.get('aa').read(), 'hello')
115
 
        store.add(StringIO('hello world'), 'bb')
116
 
        self.assertNotEqual(store.get('bb'), None)
117
 
        self.assertEqual(store.get('bb').read(), 'hello world')
118
 
 
119
 
    def test_missing_is_absent(self):
120
 
        store = self.get_store()
121
 
        self.failIf('aa' in store)
122
 
 
123
 
    def test_adding_fails_when_present(self):
124
 
        my_store = self.get_store()
125
 
        my_store.add(StringIO('hello'), 'aa')
126
 
        self.assertRaises(BzrError,
127
 
                          my_store.add, StringIO('hello'), 'aa')
128
 
 
129
 
    def test_total_size(self):
130
 
        store = self.get_store()
131
 
        store.add(StringIO('goodbye'), '123123')
132
 
        store.add(StringIO('goodbye2'), '123123.dsc')
133
 
        self.assertEqual(store.total_size(), (2, 15))
134
 
        # TODO: Switch the exception form UnlistableStore to
135
 
        #       or make Stores throw UnlistableStore if their
136
 
        #       Transport doesn't support listing
137
 
        # store_c = RemoteStore('http://example.com/')
138
 
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
139
 
 
140
 
 
141
 
class TestTextStore(TestCaseInTempDir, TestStores):
142
 
 
143
 
    def get_store(self, path=u'.'):
144
 
        t = LocalTransport(path)
145
 
        return TextStore(t, compressed=False)
146
 
 
147
 
    def test_total_size(self):
148
 
        store = self.get_store()
149
 
        store.add(StringIO('goodbye'), '123123')
150
 
        store.add(StringIO('goodbye2'), '123123.dsc')
151
 
        self.assertEqual(store.total_size(), (2, 15))
152
 
        # TODO: Switch the exception form UnlistableStore to
153
 
        #       or make Stores throw UnlistableStore if their
154
 
        #       Transport doesn't support listing
155
 
        # store_c = RemoteStore('http://example.com/')
156
 
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
157
 
 
158
 
 
159
 
class TestMixedTextStore(TestCaseInTempDir, TestStores):
160
 
 
161
 
    def get_store(self, path=u'.', compressed=True):
162
 
        t = LocalTransport(path)
163
 
        return TextStore(t, compressed=compressed)
164
 
 
165
 
    def test_get_mixed(self):
166
 
        cs = self.get_store(u'.', compressed=True)
167
 
        s = self.get_store(u'.', compressed=False)
168
 
        cs.add(StringIO('hello there'), 'a')
169
 
 
170
 
        self.failUnlessExists('a.gz')
171
 
        self.failIf(os.path.lexists('a'))
172
 
 
173
 
        self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
174
 
 
175
 
        self.assertEquals(cs.has_id('a'), True)
176
 
        self.assertEquals(s.has_id('a'), True)
177
 
        self.assertEquals(cs.get('a').read(), 'hello there')
178
 
        self.assertEquals(s.get('a').read(), 'hello there')
179
 
        
180
 
        self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
181
 
 
182
 
        s.add(StringIO('goodbye'), 'b')
183
 
        self.failUnlessExists('b')
184
 
        self.failIf(os.path.lexists('b.gz'))
185
 
        self.assertEquals(open('b').read(), 'goodbye')
186
 
 
187
 
        self.assertEquals(cs.has_id('b'), True)
188
 
        self.assertEquals(s.has_id('b'), True)
189
 
        self.assertEquals(cs.get('b').read(), 'goodbye')
190
 
        self.assertEquals(s.get('b').read(), 'goodbye')
191
 
        
192
 
        self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
193
 
 
194
 
class MockTransport(transport.Transport):
195
 
    """A fake transport for testing with."""
196
 
 
197
 
    def has(self, filename):
198
 
        return False
199
 
 
200
 
    def __init__(self, url=None):
201
 
        if url is None:
202
 
            url = "http://example.com"
203
 
        super(MockTransport, self).__init__(url)
204
 
 
205
 
    def mkdir(self, filename):
206
 
        return
207
 
 
208
 
 
209
 
class InstrumentedTransportStore(store.TransportStore):
210
 
    """An instrumented TransportStore.
211
 
 
212
 
    Here we replace template method worker methods with calls that record the
213
 
    expected results.
214
 
    """
215
 
 
216
 
    def _add(self, filename, file):
217
 
        self._calls.append(("_add", filename, file))
218
 
 
219
 
    def __init__(self, transport, prefixed=False):
220
 
        super(InstrumentedTransportStore, self).__init__(transport, prefixed)
221
 
        self._calls = []
222
 
 
223
 
 
224
 
class TestInstrumentedTransportStore(TestCase):
225
 
 
226
 
    def test__add_records(self):
227
 
        my_store = InstrumentedTransportStore(MockTransport())
228
 
        my_store._add("filename", "file")
229
 
        self.assertEqual([("_add", "filename", "file")], my_store._calls)
230
 
 
231
 
 
232
 
class TestMockTransport(TestCase):
233
 
 
234
 
    def test_isinstance(self):
235
 
        self.failUnless(isinstance(MockTransport(), transport.Transport))
236
 
 
237
 
    def test_has(self):
238
 
        self.assertEqual(False, MockTransport().has('foo'))
239
 
 
240
 
    def test_mkdir(self):
241
 
        MockTransport().mkdir('45')
242
 
 
243
 
 
244
 
class TestTransportStore(TestCase):
245
 
    
246
 
    def test__relpath_invalid(self):
247
 
        my_store = store.TransportStore(MockTransport())
248
 
        self.assertRaises(ValueError, my_store._relpath, '/foo')
249
 
        self.assertRaises(ValueError, my_store._relpath, 'foo/')
250
 
 
251
 
    def test_register_invalid_suffixes(self):
252
 
        my_store = store.TransportStore(MockTransport())
253
 
        self.assertRaises(ValueError, my_store.register_suffix, '/')
254
 
        self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
255
 
 
256
 
    def test__relpath_unregister_suffixes(self):
257
 
        my_store = store.TransportStore(MockTransport())
258
 
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
259
 
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
260
 
 
261
 
    def test__relpath_simple(self):
262
 
        my_store = store.TransportStore(MockTransport())
263
 
        self.assertEqual("foo", my_store._relpath('foo'))
264
 
 
265
 
    def test__relpath_prefixed(self):
266
 
        my_store = store.TransportStore(MockTransport(), True)
267
 
        self.assertEqual('45/foo', my_store._relpath('foo'))
268
 
 
269
 
    def test__relpath_simple_suffixed(self):
270
 
        my_store = store.TransportStore(MockTransport())
271
 
        my_store.register_suffix('bar')
272
 
        my_store.register_suffix('baz')
273
 
        self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
274
 
        self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
275
 
 
276
 
    def test__relpath_prefixed_suffixed(self):
277
 
        my_store = store.TransportStore(MockTransport(), True)
278
 
        my_store.register_suffix('bar')
279
 
        my_store.register_suffix('baz')
280
 
        self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
281
 
        self.assertEqual('45/foo.bar.baz',
282
 
                         my_store._relpath('foo', ['bar', 'baz']))
283
 
 
284
 
    def test_add_simple(self):
285
 
        stream = StringIO("content")
286
 
        my_store = InstrumentedTransportStore(MockTransport())
287
 
        my_store.add(stream, "foo")
288
 
        self.assertEqual([("_add", "foo", stream)], my_store._calls)
289
 
 
290
 
    def test_add_prefixed(self):
291
 
        stream = StringIO("content")
292
 
        my_store = InstrumentedTransportStore(MockTransport(), True)
293
 
        my_store.add(stream, "foo")
294
 
        self.assertEqual([("_add", "45/foo", stream)], my_store._calls)
295
 
 
296
 
    def test_add_simple_suffixed(self):
297
 
        stream = StringIO("content")
298
 
        my_store = InstrumentedTransportStore(MockTransport())
299
 
        my_store.register_suffix('dsc')
300
 
        my_store.add(stream, "foo", 'dsc')
301
 
        self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
302
 
        
303
 
    def test_add_simple_suffixed(self):
304
 
        stream = StringIO("content")
305
 
        my_store = InstrumentedTransportStore(MockTransport(), True)
306
 
        my_store.register_suffix('dsc')
307
 
        my_store.add(stream, "foo", 'dsc')
308
 
        self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
309
 
 
310
 
    def get_populated_store(self, prefixed=False,
311
 
            store_class=TextStore, compressed=False):
312
 
        my_store = store_class(MemoryTransport(), prefixed,
313
 
                               compressed=compressed)
314
 
        my_store.register_suffix('sig')
315
 
        stream = StringIO("signature")
316
 
        my_store.add(stream, "foo", 'sig')
317
 
        stream = StringIO("content")
318
 
        my_store.add(stream, "foo")
319
 
        stream = StringIO("signature for missing base")
320
 
        my_store.add(stream, "missing", 'sig')
321
 
        return my_store
322
 
        
323
 
    def test_has_simple(self):
324
 
        my_store = self.get_populated_store()
325
 
        self.assertEqual(True, my_store.has_id('foo'))
326
 
        my_store = self.get_populated_store(True)
327
 
        self.assertEqual(True, my_store.has_id('foo'))
328
 
 
329
 
    def test_has_suffixed(self):
330
 
        my_store = self.get_populated_store()
331
 
        self.assertEqual(True, my_store.has_id('foo', 'sig'))
332
 
        my_store = self.get_populated_store(True)
333
 
        self.assertEqual(True, my_store.has_id('foo', 'sig'))
334
 
 
335
 
    def test_has_suffixed_no_base(self):
336
 
        my_store = self.get_populated_store()
337
 
        self.assertEqual(False, my_store.has_id('missing'))
338
 
        my_store = self.get_populated_store(True)
339
 
        self.assertEqual(False, my_store.has_id('missing'))
340
 
 
341
 
    def test_get_simple(self):
342
 
        my_store = self.get_populated_store()
343
 
        self.assertEqual('content', my_store.get('foo').read())
344
 
        my_store = self.get_populated_store(True)
345
 
        self.assertEqual('content', my_store.get('foo').read())
346
 
 
347
 
    def test_get_suffixed(self):
348
 
        my_store = self.get_populated_store()
349
 
        self.assertEqual('signature', my_store.get('foo', 'sig').read())
350
 
        my_store = self.get_populated_store(True)
351
 
        self.assertEqual('signature', my_store.get('foo', 'sig').read())
352
 
 
353
 
    def test_get_suffixed_no_base(self):
354
 
        my_store = self.get_populated_store()
355
 
        self.assertEqual('signature for missing base',
356
 
                         my_store.get('missing', 'sig').read())
357
 
        my_store = self.get_populated_store(True)
358
 
        self.assertEqual('signature for missing base',
359
 
                         my_store.get('missing', 'sig').read())
360
 
 
361
 
    def test___iter__no_suffix(self):
362
 
        my_store = TextStore(MemoryTransport(),
363
 
                             prefixed=False, compressed=False)
364
 
        stream = StringIO("content")
365
 
        my_store.add(stream, "foo")
366
 
        self.assertEqual(set(['foo']),
367
 
                         set(my_store.__iter__()))
368
 
 
369
 
    def test___iter__(self):
370
 
        self.assertEqual(set(['foo']),
371
 
                         set(self.get_populated_store().__iter__()))
372
 
        self.assertEqual(set(['foo']),
373
 
                         set(self.get_populated_store(True).__iter__()))
374
 
 
375
 
    def test___iter__compressed(self):
376
 
        self.assertEqual(set(['foo']),
377
 
                         set(self.get_populated_store(
378
 
                             compressed=True).__iter__()))
379
 
        self.assertEqual(set(['foo']),
380
 
                         set(self.get_populated_store(
381
 
                             True, compressed=True).__iter__()))
382
 
 
383
 
    def test___len__(self):
384
 
        self.assertEqual(1, len(self.get_populated_store()))
385
 
 
386
 
    def test_copy_suffixes(self):
387
 
        from_store = self.get_populated_store()
388
 
        to_store = TextStore(MemoryTransport(),
389
 
                             prefixed=True, compressed=True)
390
 
        to_store.register_suffix('sig')
391
 
        to_store.copy_all_ids(from_store)
392
 
        self.assertEqual(1, len(to_store))
393
 
        self.assertEqual(set(['foo']), set(to_store.__iter__()))
394
 
        self.assertEqual('content', to_store.get('foo').read())
395
 
        self.assertEqual('signature', to_store.get('foo', 'sig').read())
396
 
        self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
397
 
 
398
 
    def test_relpath_escaped(self):
399
 
        my_store = store.TransportStore(MemoryTransport())
400
 
        self.assertEqual('%25', my_store._relpath('%'))
401
 
 
402
 
 
403
 
class TestVersionFileStore(TestCaseWithTransport):
404
 
 
405
 
    def setUp(self):
406
 
        super(TestVersionFileStore, self).setUp()
407
 
        self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
408
 
 
409
 
    def test_get_weave_registers_dirty_in_write(self):
410
 
        transaction = transactions.WriteTransaction()
411
 
        vf = self.vfstore.get_weave_or_empty('id', transaction)
412
 
        transaction.finish()
413
 
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
414
 
        transaction = transactions.WriteTransaction()
415
 
        vf = self.vfstore.get_weave('id', transaction)
416
 
        transaction.finish()
417
 
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
418
 
 
419
 
    def test_get_weave_or_empty_readonly_fails(self):
420
 
        transaction = transactions.ReadOnlyTransaction()
421
 
        vf = self.assertRaises(errors.ReadOnlyError,
422
 
                               self.vfstore.get_weave_or_empty,
423
 
                               'id',
424
 
                               transaction)
425
 
 
426
 
    def test_get_weave_readonly_cant_write(self):
427
 
        transaction = transactions.WriteTransaction()
428
 
        vf = self.vfstore.get_weave_or_empty('id', transaction)
429
 
        transaction.finish()
430
 
        transaction = transactions.ReadOnlyTransaction()
431
 
        vf = self.vfstore.get_weave_or_empty('id', transaction)
432
 
        self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
433