~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_store.py

Late bind to PatienceSequenceMatcher to allow plugin to override.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
from cStringIO import StringIO
20
20
import os
 
21
import gzip
21
22
 
22
 
from bzrlib.errors import BzrError, UnlistableStore
23
 
from bzrlib.store import copy_all
 
23
import bzrlib.errors as errors
 
24
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
24
25
from bzrlib.transport.local import LocalTransport
25
 
from bzrlib.transport import NoSuchFile
26
 
from bzrlib.store.compressed_text import CompressedTextStore
27
26
from bzrlib.store.text import TextStore
28
 
from bzrlib.selftest import TestCase, TestCaseInTempDir
 
27
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
29
28
import bzrlib.store as store
 
29
import bzrlib.transactions as transactions
30
30
import bzrlib.transport as transport
31
31
from bzrlib.transport.memory import MemoryTransport
32
32
 
33
33
 
34
34
class TestStores(object):
 
35
    """Mixin template class that provides some common tests for stores"""
35
36
 
36
37
    def check_content(self, store, fileid, value):
37
38
        f = store.get(fileid)
50
51
        store_a.add('foo', '1')
51
52
        os.mkdir('b')
52
53
        store_b = self.get_store('b')
53
 
        copy_all(store_a, store_b)
 
54
        store_b.copy_all_ids(store_a)
54
55
        self.assertEqual(store_a.get('1').read(), 'foo')
55
56
        self.assertEqual(store_b.get('1').read(), 'foo')
56
57
        # TODO: Switch the exception form UnlistableStore to
79
80
 
80
81
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
81
82
 
82
 
    def get_store(self, path='.'):
 
83
    def get_store(self, path=u'.'):
83
84
        t = LocalTransport(path)
84
 
        return CompressedTextStore(t)
 
85
        return TextStore(t, compressed=True)
85
86
 
86
87
    def test_total_size(self):
87
 
        store = self.get_store('.')
 
88
        store = self.get_store(u'.')
88
89
        store.register_suffix('dsc')
89
90
        store.add(StringIO('goodbye'), '123123')
90
91
        store.add(StringIO('goodbye2'), '123123', 'dsc')
92
93
        self.assertEqual(store.total_size(), (2, 55))
93
94
        
94
95
    def test__relpath_suffixed(self):
95
 
        my_store = CompressedTextStore(MockTransport(), True)
 
96
        my_store = TextStore(MockTransport(),
 
97
                             prefixed=True, compressed=True)
96
98
        my_store.register_suffix('dsc')
97
 
        self.assertEqual('45/foo.dsc.gz', my_store._relpath('foo', ['dsc']))
 
99
        self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
98
100
 
99
101
 
100
102
class TestMemoryStore(TestCase):
101
103
    
102
104
    def get_store(self):
103
 
        return store.ImmutableMemoryStore()
 
105
        return TextStore(MemoryTransport())
104
106
    
105
 
    def test_imports(self):
106
 
        from bzrlib.store import ImmutableMemoryStore
107
 
 
108
107
    def test_add_and_retrieve(self):
109
108
        store = self.get_store()
110
109
        store.add(StringIO('hello'), 'aa')
138
137
 
139
138
class TestTextStore(TestCaseInTempDir, TestStores):
140
139
 
141
 
    def get_store(self, path='.'):
 
140
    def get_store(self, path=u'.'):
142
141
        t = LocalTransport(path)
143
 
        return TextStore(t)
 
142
        return TextStore(t, compressed=False)
144
143
 
145
144
    def test_total_size(self):
146
145
        store = self.get_store()
154
153
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
155
154
 
156
155
 
 
156
class TestMixedTextStore(TestCaseInTempDir, TestStores):
 
157
 
 
158
    def get_store(self, path=u'.', compressed=True):
 
159
        t = LocalTransport(path)
 
160
        return TextStore(t, compressed=compressed)
 
161
 
 
162
    def test_get_mixed(self):
 
163
        cs = self.get_store(u'.', compressed=True)
 
164
        s = self.get_store(u'.', compressed=False)
 
165
        cs.add(StringIO('hello there'), 'a')
 
166
 
 
167
        self.failUnlessExists('a.gz')
 
168
        self.failIf(os.path.lexists('a'))
 
169
 
 
170
        self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
 
171
 
 
172
        self.assertEquals(cs.has_id('a'), True)
 
173
        self.assertEquals(s.has_id('a'), True)
 
174
        self.assertEquals(cs.get('a').read(), 'hello there')
 
175
        self.assertEquals(s.get('a').read(), 'hello there')
 
176
        
 
177
        self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
 
178
 
 
179
        s.add(StringIO('goodbye'), 'b')
 
180
        self.failUnlessExists('b')
 
181
        self.failIf(os.path.lexists('b.gz'))
 
182
        self.assertEquals(open('b').read(), 'goodbye')
 
183
 
 
184
        self.assertEquals(cs.has_id('b'), True)
 
185
        self.assertEquals(s.has_id('b'), True)
 
186
        self.assertEquals(cs.get('b').read(), 'goodbye')
 
187
        self.assertEquals(s.get('b').read(), 'goodbye')
 
188
        
 
189
        self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
 
190
 
157
191
class MockTransport(transport.Transport):
158
192
    """A fake transport for testing with."""
159
193
 
231
265
 
232
266
    def test__relpath_simple_suffixed(self):
233
267
        my_store = store.TransportStore(MockTransport())
234
 
        my_store.register_suffix('gz')
235
268
        my_store.register_suffix('bar')
236
 
        self.assertEqual('foo.gz', my_store._relpath('foo', ['gz']))
237
 
        self.assertEqual('foo.gz.bar', my_store._relpath('foo', ['gz', 'bar']))
 
269
        my_store.register_suffix('baz')
 
270
        self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
 
271
        self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
238
272
 
239
273
    def test__relpath_prefixed_suffixed(self):
240
274
        my_store = store.TransportStore(MockTransport(), True)
241
 
        my_store.register_suffix('gz')
242
275
        my_store.register_suffix('bar')
243
 
        self.assertEqual('45/foo.gz', my_store._relpath('foo', ['gz']))
244
 
        self.assertEqual('45/foo.gz.bar',
245
 
                         my_store._relpath('foo', ['gz', 'bar']))
 
276
        my_store.register_suffix('baz')
 
277
        self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
 
278
        self.assertEqual('45/foo.bar.baz',
 
279
                         my_store._relpath('foo', ['bar', 'baz']))
246
280
 
247
281
    def test_add_simple(self):
248
282
        stream = StringIO("content")
270
304
        my_store.add(stream, "foo", 'dsc')
271
305
        self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
272
306
 
273
 
    def get_populated_store(self, prefixed=False, store_class=TextStore):
274
 
        my_store = store_class(MemoryTransport(), prefixed)
 
307
    def get_populated_store(self, prefixed=False,
 
308
            store_class=TextStore, compressed=False):
 
309
        my_store = store_class(MemoryTransport(), prefixed,
 
310
                               compressed=compressed)
275
311
        my_store.register_suffix('sig')
276
312
        stream = StringIO("signature")
277
313
        my_store.add(stream, "foo", 'sig')
320
356
                         my_store.get('missing', 'sig').read())
321
357
 
322
358
    def test___iter__no_suffix(self):
323
 
        my_store = TextStore(MemoryTransport(), False)
 
359
        my_store = TextStore(MemoryTransport(),
 
360
                             prefixed=False, compressed=False)
324
361
        stream = StringIO("content")
325
362
        my_store.add(stream, "foo")
326
363
        self.assertEqual(set(['foo']),
335
372
    def test___iter__compressed(self):
336
373
        self.assertEqual(set(['foo']),
337
374
                         set(self.get_populated_store(
338
 
                             store_class=CompressedTextStore).__iter__()))
 
375
                             compressed=True).__iter__()))
339
376
        self.assertEqual(set(['foo']),
340
377
                         set(self.get_populated_store(
341
 
                             True, CompressedTextStore).__iter__()))
 
378
                             True, compressed=True).__iter__()))
342
379
 
343
380
    def test___len__(self):
344
381
        self.assertEqual(1, len(self.get_populated_store()))
345
382
 
346
383
    def test_copy_suffixes(self):
347
384
        from_store = self.get_populated_store()
348
 
        to_store = CompressedTextStore(MemoryTransport(), True)
 
385
        to_store = TextStore(MemoryTransport(),
 
386
                             prefixed=True, compressed=True)
349
387
        to_store.register_suffix('sig')
350
 
        copy_all(from_store, to_store)
 
388
        to_store.copy_all_ids(from_store)
351
389
        self.assertEqual(1, len(to_store))
352
390
        self.assertEqual(set(['foo']), set(to_store.__iter__()))
353
391
        self.assertEqual('content', to_store.get('foo').read())
357
395
    def test_relpath_escaped(self):
358
396
        my_store = store.TransportStore(MemoryTransport())
359
397
        self.assertEqual('%25', my_store._relpath('%'))
 
398
 
 
399
    def test_escaped_uppercase(self):
 
400
        """Uppercase letters are escaped for safety on Windows"""
 
401
        my_store = store.TransportStore(MemoryTransport(), escaped=True)
 
402
        # a particularly perverse file-id! :-)
 
403
        self.assertEquals(my_store._escape_file_id('C:<>'), '%43%3a%3c%3e')
 
404
 
 
405
 
 
406
class TestVersionFileStore(TestCaseWithTransport):
 
407
 
 
408
    def setUp(self):
 
409
        super(TestVersionFileStore, self).setUp()
 
410
        self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
 
411
 
 
412
    def test_get_weave_registers_dirty_in_write(self):
 
413
        transaction = transactions.WriteTransaction()
 
414
        vf = self.vfstore.get_weave_or_empty('id', transaction)
 
415
        transaction.finish()
 
416
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
 
417
        transaction = transactions.WriteTransaction()
 
418
        vf = self.vfstore.get_weave('id', transaction)
 
419
        transaction.finish()
 
420
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
 
421
 
 
422
    def test_get_weave_or_empty_readonly_fails(self):
 
423
        transaction = transactions.ReadOnlyTransaction()
 
424
        vf = self.assertRaises(errors.ReadOnlyError,
 
425
                               self.vfstore.get_weave_or_empty,
 
426
                               'id',
 
427
                               transaction)
 
428
 
 
429
    def test_get_weave_readonly_cant_write(self):
 
430
        transaction = transactions.WriteTransaction()
 
431
        vf = self.vfstore.get_weave_or_empty('id', transaction)
 
432
        transaction.finish()
 
433
        transaction = transactions.ReadOnlyTransaction()
 
434
        vf = self.vfstore.get_weave_or_empty('id', transaction)
 
435
        self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
 
436