~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_store.py

  • Committer: Aaron Bentley
  • Date: 2006-11-10 01:55:55 UTC
  • mto: This revision was merged to the branch mainline in revision 2127.
  • Revision ID: aaron.bentley@utoronto.ca-20061110015555-f48202744b630209
Ignore html docs (both kinds)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Development Ltd
2
 
 
 
1
# Copyright (C) 2005 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18
18
 
19
19
from cStringIO import StringIO
20
20
import os
 
21
import gzip
21
22
 
22
 
from bzrlib.errors import BzrError, UnlistableStore
23
 
from bzrlib.store import copy_all
 
23
import bzrlib.errors as errors
 
24
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
24
25
from bzrlib.transport.local import LocalTransport
25
 
from bzrlib.transport import NoSuchFile
26
 
from bzrlib.store.compressed_text import CompressedTextStore
27
26
from bzrlib.store.text import TextStore
28
 
from bzrlib.selftest import TestCase, TestCaseInTempDir
 
27
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
29
28
import bzrlib.store as store
 
29
import bzrlib.transactions as transactions
30
30
import bzrlib.transport as transport
31
31
from bzrlib.transport.memory import MemoryTransport
32
32
 
33
33
 
34
34
class TestStores(object):
 
35
    """Mixin template class that provides some common tests for stores"""
35
36
 
36
37
    def check_content(self, store, fileid, value):
37
38
        f = store.get(fileid)
38
39
        self.assertEqual(f.read(), value)
39
40
 
 
41
    def test_add_str_deprecated(self):
 
42
        os.mkdir('a')
 
43
        store = self.get_store('a')
 
44
        self.callDeprecated(['Passing a string to Store.add'
 
45
                             ' was deprecated in version 0.11.'],
 
46
                            store.add, 'foo', '1')
 
47
        self.assertEqual('foo', store.get('1').read())
 
48
 
40
49
    def fill_store(self, store):
41
50
        store.add(StringIO('hello'), 'a')
42
51
        store.add(StringIO('other'), 'b')
47
56
        """Test copying"""
48
57
        os.mkdir('a')
49
58
        store_a = self.get_store('a')
50
 
        store_a.add('foo', '1')
 
59
        store_a.add(StringIO('foo'), '1')
51
60
        os.mkdir('b')
52
61
        store_b = self.get_store('b')
53
 
        copy_all(store_a, store_b)
 
62
        store_b.copy_all_ids(store_a)
54
63
        self.assertEqual(store_a.get('1').read(), 'foo')
55
64
        self.assertEqual(store_b.get('1').read(), 'foo')
56
65
        # TODO: Switch the exception form UnlistableStore to
79
88
 
80
89
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
81
90
 
82
 
    def get_store(self, path='.'):
83
 
        t = LocalTransport(path)
84
 
        return CompressedTextStore(t)
 
91
    def get_store(self, path=u'.'):
 
92
        t = transport.get_transport(path)
 
93
        return TextStore(t, compressed=True)
85
94
 
86
95
    def test_total_size(self):
87
 
        store = self.get_store('.')
 
96
        store = self.get_store(u'.')
88
97
        store.register_suffix('dsc')
89
98
        store.add(StringIO('goodbye'), '123123')
90
99
        store.add(StringIO('goodbye2'), '123123', 'dsc')
92
101
        self.assertEqual(store.total_size(), (2, 55))
93
102
        
94
103
    def test__relpath_suffixed(self):
95
 
        my_store = CompressedTextStore(MockTransport(), True)
 
104
        my_store = TextStore(MockTransport(),
 
105
                             prefixed=True, compressed=True)
96
106
        my_store.register_suffix('dsc')
97
 
        self.assertEqual('45/foo.dsc.gz', my_store._relpath('foo', ['dsc']))
 
107
        self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
98
108
 
99
109
 
100
110
class TestMemoryStore(TestCase):
101
111
    
102
112
    def get_store(self):
103
 
        return store.ImmutableMemoryStore()
 
113
        return TextStore(MemoryTransport())
104
114
    
105
 
    def test_imports(self):
106
 
        from bzrlib.store import ImmutableMemoryStore
107
 
 
108
115
    def test_add_and_retrieve(self):
109
116
        store = self.get_store()
110
117
        store.add(StringIO('hello'), 'aa')
138
145
 
139
146
class TestTextStore(TestCaseInTempDir, TestStores):
140
147
 
141
 
    def get_store(self, path='.'):
142
 
        t = LocalTransport(path)
143
 
        return TextStore(t)
 
148
    def get_store(self, path=u'.'):
 
149
        t = transport.get_transport(path)
 
150
        return TextStore(t, compressed=False)
144
151
 
145
152
    def test_total_size(self):
146
153
        store = self.get_store()
154
161
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
155
162
 
156
163
 
 
164
class TestMixedTextStore(TestCaseInTempDir, TestStores):
 
165
 
 
166
    def get_store(self, path=u'.', compressed=True):
 
167
        t = transport.get_transport(path)
 
168
        return TextStore(t, compressed=compressed)
 
169
 
 
170
    def test_get_mixed(self):
 
171
        cs = self.get_store(u'.', compressed=True)
 
172
        s = self.get_store(u'.', compressed=False)
 
173
        cs.add(StringIO('hello there'), 'a')
 
174
 
 
175
        self.failUnlessExists('a.gz')
 
176
        self.failIf(os.path.lexists('a'))
 
177
 
 
178
        self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
 
179
 
 
180
        self.assertEquals(cs.has_id('a'), True)
 
181
        self.assertEquals(s.has_id('a'), True)
 
182
        self.assertEquals(cs.get('a').read(), 'hello there')
 
183
        self.assertEquals(s.get('a').read(), 'hello there')
 
184
        
 
185
        self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
 
186
 
 
187
        s.add(StringIO('goodbye'), 'b')
 
188
        self.failUnlessExists('b')
 
189
        self.failIf(os.path.lexists('b.gz'))
 
190
        self.assertEquals(open('b').read(), 'goodbye')
 
191
 
 
192
        self.assertEquals(cs.has_id('b'), True)
 
193
        self.assertEquals(s.has_id('b'), True)
 
194
        self.assertEquals(cs.get('b').read(), 'goodbye')
 
195
        self.assertEquals(s.get('b').read(), 'goodbye')
 
196
        
 
197
        self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
 
198
 
157
199
class MockTransport(transport.Transport):
158
200
    """A fake transport for testing with."""
159
201
 
231
273
 
232
274
    def test__relpath_simple_suffixed(self):
233
275
        my_store = store.TransportStore(MockTransport())
234
 
        my_store.register_suffix('gz')
235
276
        my_store.register_suffix('bar')
236
 
        self.assertEqual('foo.gz', my_store._relpath('foo', ['gz']))
237
 
        self.assertEqual('foo.gz.bar', my_store._relpath('foo', ['gz', 'bar']))
 
277
        my_store.register_suffix('baz')
 
278
        self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
 
279
        self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
238
280
 
239
281
    def test__relpath_prefixed_suffixed(self):
240
282
        my_store = store.TransportStore(MockTransport(), True)
241
 
        my_store.register_suffix('gz')
242
283
        my_store.register_suffix('bar')
243
 
        self.assertEqual('45/foo.gz', my_store._relpath('foo', ['gz']))
244
 
        self.assertEqual('45/foo.gz.bar',
245
 
                         my_store._relpath('foo', ['gz', 'bar']))
 
284
        my_store.register_suffix('baz')
 
285
        self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
 
286
        self.assertEqual('45/foo.bar.baz',
 
287
                         my_store._relpath('foo', ['bar', 'baz']))
246
288
 
247
289
    def test_add_simple(self):
248
290
        stream = StringIO("content")
270
312
        my_store.add(stream, "foo", 'dsc')
271
313
        self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
272
314
 
273
 
    def get_populated_store(self, prefixed=False, store_class=TextStore):
274
 
        my_store = store_class(MemoryTransport(), prefixed)
 
315
    def get_populated_store(self, prefixed=False,
 
316
            store_class=TextStore, compressed=False):
 
317
        my_store = store_class(MemoryTransport(), prefixed,
 
318
                               compressed=compressed)
275
319
        my_store.register_suffix('sig')
276
320
        stream = StringIO("signature")
277
321
        my_store.add(stream, "foo", 'sig')
320
364
                         my_store.get('missing', 'sig').read())
321
365
 
322
366
    def test___iter__no_suffix(self):
323
 
        my_store = TextStore(MemoryTransport(), False)
 
367
        my_store = TextStore(MemoryTransport(),
 
368
                             prefixed=False, compressed=False)
324
369
        stream = StringIO("content")
325
370
        my_store.add(stream, "foo")
326
371
        self.assertEqual(set(['foo']),
335
380
    def test___iter__compressed(self):
336
381
        self.assertEqual(set(['foo']),
337
382
                         set(self.get_populated_store(
338
 
                             store_class=CompressedTextStore).__iter__()))
 
383
                             compressed=True).__iter__()))
339
384
        self.assertEqual(set(['foo']),
340
385
                         set(self.get_populated_store(
341
 
                             True, CompressedTextStore).__iter__()))
 
386
                             True, compressed=True).__iter__()))
342
387
 
343
388
    def test___len__(self):
344
389
        self.assertEqual(1, len(self.get_populated_store()))
345
390
 
346
391
    def test_copy_suffixes(self):
347
392
        from_store = self.get_populated_store()
348
 
        to_store = CompressedTextStore(MemoryTransport(), True)
 
393
        to_store = TextStore(MemoryTransport(),
 
394
                             prefixed=True, compressed=True)
349
395
        to_store.register_suffix('sig')
350
 
        copy_all(from_store, to_store)
 
396
        to_store.copy_all_ids(from_store)
351
397
        self.assertEqual(1, len(to_store))
352
398
        self.assertEqual(set(['foo']), set(to_store.__iter__()))
353
399
        self.assertEqual('content', to_store.get('foo').read())
357
403
    def test_relpath_escaped(self):
358
404
        my_store = store.TransportStore(MemoryTransport())
359
405
        self.assertEqual('%25', my_store._relpath('%'))
 
406
 
 
407
    def test_escaped_uppercase(self):
 
408
        """Uppercase letters are escaped for safety on Windows"""
 
409
        my_store = store.TransportStore(MemoryTransport(), escaped=True)
 
410
        # a particularly perverse file-id! :-)
 
411
        self.assertEquals(my_store._escape_file_id('C:<>'), '%43%3a%3c%3e')
 
412
 
 
413
 
 
414
class TestVersionFileStore(TestCaseWithTransport):
 
415
 
 
416
    def setUp(self):
 
417
        super(TestVersionFileStore, self).setUp()
 
418
        self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
 
419
 
 
420
    def test_get_weave_registers_dirty_in_write(self):
 
421
        transaction = transactions.WriteTransaction()
 
422
        vf = self.vfstore.get_weave_or_empty('id', transaction)
 
423
        transaction.finish()
 
424
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
 
425
        transaction = transactions.WriteTransaction()
 
426
        vf = self.vfstore.get_weave('id', transaction)
 
427
        transaction.finish()
 
428
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
 
429
 
 
430
    def test_get_weave_or_empty_readonly_fails(self):
 
431
        transaction = transactions.ReadOnlyTransaction()
 
432
        vf = self.assertRaises(errors.ReadOnlyError,
 
433
                               self.vfstore.get_weave_or_empty,
 
434
                               'id',
 
435
                               transaction)
 
436
 
 
437
    def test_get_weave_readonly_cant_write(self):
 
438
        transaction = transactions.WriteTransaction()
 
439
        vf = self.vfstore.get_weave_or_empty('id', transaction)
 
440
        transaction.finish()
 
441
        transaction = transactions.ReadOnlyTransaction()
 
442
        vf = self.vfstore.get_weave_or_empty('id', transaction)
 
443
        self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
 
444