~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/teststore.py

  • Committer: Aaron Bentley
  • Date: 2005-10-24 00:51:15 UTC
  • mto: (1185.25.1)
  • mto: This revision was merged to the branch mainline in revision 1488.
  • Revision ID: aaron.bentley@utoronto.ca-20051024005115-11eb6ff177fc195d
Improved sort order of indirect merges

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2005 by Canonical Development Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18
18
 
19
19
from cStringIO import StringIO
20
20
import os
21
 
import gzip
22
21
 
23
 
import bzrlib.errors as errors
24
 
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
 
22
from bzrlib.errors import BzrError, UnlistableStore
 
23
from bzrlib.store import copy_all
25
24
from bzrlib.transport.local import LocalTransport
 
25
from bzrlib.transport import NoSuchFile
 
26
from bzrlib.store.compressed_text import CompressedTextStore
26
27
from bzrlib.store.text import TextStore
27
 
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
 
28
from bzrlib.selftest import TestCase, TestCaseInTempDir
28
29
import bzrlib.store as store
29
 
import bzrlib.transactions as transactions
30
30
import bzrlib.transport as transport
31
31
from bzrlib.transport.memory import MemoryTransport
32
32
 
33
33
 
34
34
class TestStores(object):
35
 
    """Mixin template class that provides some common tests for stores"""
36
35
 
37
36
    def check_content(self, store, fileid, value):
38
37
        f = store.get(fileid)
39
38
        self.assertEqual(f.read(), value)
40
39
 
41
 
    def test_add_str_deprecated(self):
42
 
        os.mkdir('a')
43
 
        store = self.get_store('a')
44
 
        self.callDeprecated(['Passing a string to Store.add'
45
 
                             ' was deprecated in version 0.11.'],
46
 
                            store.add, 'foo', '1')
47
 
        self.assertEqual('foo', store.get('1').read())
48
 
 
49
40
    def fill_store(self, store):
50
41
        store.add(StringIO('hello'), 'a')
51
42
        store.add(StringIO('other'), 'b')
56
47
        """Test copying"""
57
48
        os.mkdir('a')
58
49
        store_a = self.get_store('a')
59
 
        store_a.add(StringIO('foo'), '1')
 
50
        store_a.add('foo', '1')
60
51
        os.mkdir('b')
61
52
        store_b = self.get_store('b')
62
 
        store_b.copy_all_ids(store_a)
 
53
        copy_all(store_a, store_b)
63
54
        self.assertEqual(store_a.get('1').read(), 'foo')
64
55
        self.assertEqual(store_b.get('1').read(), 'foo')
65
56
        # TODO: Switch the exception form UnlistableStore to
88
79
 
89
80
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
90
81
 
91
 
    def get_store(self, path=u'.'):
92
 
        t = transport.get_transport(path)
93
 
        return TextStore(t, compressed=True)
 
82
    def get_store(self, path='.'):
 
83
        t = LocalTransport(path)
 
84
        return CompressedTextStore(t)
94
85
 
95
86
    def test_total_size(self):
96
 
        store = self.get_store(u'.')
 
87
        store = self.get_store('.')
97
88
        store.register_suffix('dsc')
98
89
        store.add(StringIO('goodbye'), '123123')
99
90
        store.add(StringIO('goodbye2'), '123123', 'dsc')
101
92
        self.assertEqual(store.total_size(), (2, 55))
102
93
        
103
94
    def test__relpath_suffixed(self):
104
 
        my_store = TextStore(MockTransport(),
105
 
                             prefixed=True, compressed=True)
 
95
        my_store = CompressedTextStore(MockTransport(), True)
106
96
        my_store.register_suffix('dsc')
107
 
        self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
 
97
        self.assertEqual('45/foo.dsc.gz', my_store._relpath('foo', ['dsc']))
108
98
 
109
99
 
110
100
class TestMemoryStore(TestCase):
111
101
    
112
102
    def get_store(self):
113
 
        return TextStore(MemoryTransport())
 
103
        return store.ImmutableMemoryStore()
114
104
    
 
105
    def test_imports(self):
 
106
        from bzrlib.store import ImmutableMemoryStore
 
107
 
115
108
    def test_add_and_retrieve(self):
116
109
        store = self.get_store()
117
110
        store.add(StringIO('hello'), 'aa')
145
138
 
146
139
class TestTextStore(TestCaseInTempDir, TestStores):
147
140
 
148
 
    def get_store(self, path=u'.'):
149
 
        t = transport.get_transport(path)
150
 
        return TextStore(t, compressed=False)
 
141
    def get_store(self, path='.'):
 
142
        t = LocalTransport(path)
 
143
        return TextStore(t)
151
144
 
152
145
    def test_total_size(self):
153
146
        store = self.get_store()
161
154
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
162
155
 
163
156
 
164
 
class TestMixedTextStore(TestCaseInTempDir, TestStores):
165
 
 
166
 
    def get_store(self, path=u'.', compressed=True):
167
 
        t = transport.get_transport(path)
168
 
        return TextStore(t, compressed=compressed)
169
 
 
170
 
    def test_get_mixed(self):
171
 
        cs = self.get_store(u'.', compressed=True)
172
 
        s = self.get_store(u'.', compressed=False)
173
 
        cs.add(StringIO('hello there'), 'a')
174
 
 
175
 
        self.failUnlessExists('a.gz')
176
 
        self.failIf(os.path.lexists('a'))
177
 
 
178
 
        self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
179
 
 
180
 
        self.assertEquals(cs.has_id('a'), True)
181
 
        self.assertEquals(s.has_id('a'), True)
182
 
        self.assertEquals(cs.get('a').read(), 'hello there')
183
 
        self.assertEquals(s.get('a').read(), 'hello there')
184
 
        
185
 
        self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
186
 
 
187
 
        s.add(StringIO('goodbye'), 'b')
188
 
        self.failUnlessExists('b')
189
 
        self.failIf(os.path.lexists('b.gz'))
190
 
        self.assertEquals(open('b').read(), 'goodbye')
191
 
 
192
 
        self.assertEquals(cs.has_id('b'), True)
193
 
        self.assertEquals(s.has_id('b'), True)
194
 
        self.assertEquals(cs.get('b').read(), 'goodbye')
195
 
        self.assertEquals(s.get('b').read(), 'goodbye')
196
 
        
197
 
        self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
198
 
 
199
157
class MockTransport(transport.Transport):
200
158
    """A fake transport for testing with."""
201
159
 
273
231
 
274
232
    def test__relpath_simple_suffixed(self):
275
233
        my_store = store.TransportStore(MockTransport())
 
234
        my_store.register_suffix('gz')
276
235
        my_store.register_suffix('bar')
277
 
        my_store.register_suffix('baz')
278
 
        self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
279
 
        self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
 
236
        self.assertEqual('foo.gz', my_store._relpath('foo', ['gz']))
 
237
        self.assertEqual('foo.gz.bar', my_store._relpath('foo', ['gz', 'bar']))
280
238
 
281
239
    def test__relpath_prefixed_suffixed(self):
282
240
        my_store = store.TransportStore(MockTransport(), True)
 
241
        my_store.register_suffix('gz')
283
242
        my_store.register_suffix('bar')
284
 
        my_store.register_suffix('baz')
285
 
        self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
286
 
        self.assertEqual('45/foo.bar.baz',
287
 
                         my_store._relpath('foo', ['bar', 'baz']))
 
243
        self.assertEqual('45/foo.gz', my_store._relpath('foo', ['gz']))
 
244
        self.assertEqual('45/foo.gz.bar',
 
245
                         my_store._relpath('foo', ['gz', 'bar']))
288
246
 
289
247
    def test_add_simple(self):
290
248
        stream = StringIO("content")
312
270
        my_store.add(stream, "foo", 'dsc')
313
271
        self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
314
272
 
315
 
    def get_populated_store(self, prefixed=False,
316
 
            store_class=TextStore, compressed=False):
317
 
        my_store = store_class(MemoryTransport(), prefixed,
318
 
                               compressed=compressed)
 
273
    def get_populated_store(self, prefixed=False, store_class=TextStore):
 
274
        my_store = store_class(MemoryTransport(), prefixed)
319
275
        my_store.register_suffix('sig')
320
276
        stream = StringIO("signature")
321
277
        my_store.add(stream, "foo", 'sig')
364
320
                         my_store.get('missing', 'sig').read())
365
321
 
366
322
    def test___iter__no_suffix(self):
367
 
        my_store = TextStore(MemoryTransport(),
368
 
                             prefixed=False, compressed=False)
 
323
        my_store = TextStore(MemoryTransport(), False)
369
324
        stream = StringIO("content")
370
325
        my_store.add(stream, "foo")
371
326
        self.assertEqual(set(['foo']),
380
335
    def test___iter__compressed(self):
381
336
        self.assertEqual(set(['foo']),
382
337
                         set(self.get_populated_store(
383
 
                             compressed=True).__iter__()))
 
338
                             store_class=CompressedTextStore).__iter__()))
384
339
        self.assertEqual(set(['foo']),
385
340
                         set(self.get_populated_store(
386
 
                             True, compressed=True).__iter__()))
 
341
                             True, CompressedTextStore).__iter__()))
387
342
 
388
343
    def test___len__(self):
389
344
        self.assertEqual(1, len(self.get_populated_store()))
390
345
 
391
346
    def test_copy_suffixes(self):
392
347
        from_store = self.get_populated_store()
393
 
        to_store = TextStore(MemoryTransport(),
394
 
                             prefixed=True, compressed=True)
 
348
        to_store = CompressedTextStore(MemoryTransport(), True)
395
349
        to_store.register_suffix('sig')
396
 
        to_store.copy_all_ids(from_store)
 
350
        copy_all(from_store, to_store)
397
351
        self.assertEqual(1, len(to_store))
398
352
        self.assertEqual(set(['foo']), set(to_store.__iter__()))
399
353
        self.assertEqual('content', to_store.get('foo').read())
403
357
    def test_relpath_escaped(self):
404
358
        my_store = store.TransportStore(MemoryTransport())
405
359
        self.assertEqual('%25', my_store._relpath('%'))
406
 
 
407
 
    def test_escaped_uppercase(self):
408
 
        """Uppercase letters are escaped for safety on Windows"""
409
 
        my_store = store.TransportStore(MemoryTransport(), escaped=True)
410
 
        # a particularly perverse file-id! :-)
411
 
        self.assertEquals(my_store._escape_file_id('C:<>'), '%43%3a%3c%3e')
412
 
 
413
 
 
414
 
class TestVersionFileStore(TestCaseWithTransport):
415
 
 
416
 
    def setUp(self):
417
 
        super(TestVersionFileStore, self).setUp()
418
 
        self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
419
 
 
420
 
    def test_get_weave_registers_dirty_in_write(self):
421
 
        transaction = transactions.WriteTransaction()
422
 
        vf = self.vfstore.get_weave_or_empty('id', transaction)
423
 
        transaction.finish()
424
 
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
425
 
        transaction = transactions.WriteTransaction()
426
 
        vf = self.vfstore.get_weave('id', transaction)
427
 
        transaction.finish()
428
 
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
429
 
 
430
 
    def test_get_weave_or_empty_readonly_fails(self):
431
 
        transaction = transactions.ReadOnlyTransaction()
432
 
        vf = self.assertRaises(errors.ReadOnlyError,
433
 
                               self.vfstore.get_weave_or_empty,
434
 
                               'id',
435
 
                               transaction)
436
 
 
437
 
    def test_get_weave_readonly_cant_write(self):
438
 
        transaction = transactions.WriteTransaction()
439
 
        vf = self.vfstore.get_weave_or_empty('id', transaction)
440
 
        transaction.finish()
441
 
        transaction = transactions.ReadOnlyTransaction()
442
 
        vf = self.vfstore.get_weave_or_empty('id', transaction)
443
 
        self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
444