~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_store.py

[merge] Storage filename escaping

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2007 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2005 by Canonical Development Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Test Store implementations."""
18
18
 
26
26
from bzrlib.store.text import TextStore
27
27
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
28
28
import bzrlib.store as store
29
 
import bzrlib.store.versioned
30
29
import bzrlib.transactions as transactions
31
30
import bzrlib.transport as transport
32
31
from bzrlib.transport.memory import MemoryTransport
49
48
        """Test copying"""
50
49
        os.mkdir('a')
51
50
        store_a = self.get_store('a')
52
 
        store_a.add(StringIO('foo'), '1')
 
51
        store_a.add('foo', '1')
53
52
        os.mkdir('b')
54
53
        store_b = self.get_store('b')
55
54
        store_b.copy_all_ids(store_a)
64
63
    def test_get(self):
65
64
        store = self.get_store()
66
65
        self.fill_store(store)
67
 
 
 
66
    
68
67
        self.check_content(store, 'a', 'hello')
69
68
        self.check_content(store, 'b', 'other')
70
69
        self.check_content(store, 'c', 'something')
71
 
 
 
70
    
72
71
        # Make sure that requesting a non-existing file fails
73
72
        self.assertRaises(KeyError, self.check_content, store, 'd', None)
74
73
 
82
81
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
83
82
 
84
83
    def get_store(self, path=u'.'):
85
 
        t = transport.get_transport(path)
 
84
        t = LocalTransport(path)
86
85
        return TextStore(t, compressed=True)
87
86
 
88
87
    def test_total_size(self):
92
91
        store.add(StringIO('goodbye2'), '123123', 'dsc')
93
92
        # these get gzipped - content should be stable
94
93
        self.assertEqual(store.total_size(), (2, 55))
95
 
 
 
94
        
96
95
    def test__relpath_suffixed(self):
97
96
        my_store = TextStore(MockTransport(),
98
97
                             prefixed=True, compressed=True)
101
100
 
102
101
 
103
102
class TestMemoryStore(TestCase):
104
 
 
 
103
    
105
104
    def get_store(self):
106
 
        return TextStore(MemoryTransport())
 
105
        return store.ImmutableMemoryStore()
 
106
    
 
107
    def test_imports(self):
 
108
        from bzrlib.store import ImmutableMemoryStore
107
109
 
108
110
    def test_add_and_retrieve(self):
109
111
        store = self.get_store()
139
141
class TestTextStore(TestCaseInTempDir, TestStores):
140
142
 
141
143
    def get_store(self, path=u'.'):
142
 
        t = transport.get_transport(path)
 
144
        t = LocalTransport(path)
143
145
        return TextStore(t, compressed=False)
144
146
 
145
147
    def test_total_size(self):
157
159
class TestMixedTextStore(TestCaseInTempDir, TestStores):
158
160
 
159
161
    def get_store(self, path=u'.', compressed=True):
160
 
        t = transport.get_transport(path)
 
162
        t = LocalTransport(path)
161
163
        return TextStore(t, compressed=compressed)
162
164
 
163
165
    def test_get_mixed(self):
174
176
        self.assertEquals(s.has_id('a'), True)
175
177
        self.assertEquals(cs.get('a').read(), 'hello there')
176
178
        self.assertEquals(s.get('a').read(), 'hello there')
177
 
 
 
179
        
178
180
        self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
179
181
 
180
182
        s.add(StringIO('goodbye'), 'b')
186
188
        self.assertEquals(s.has_id('b'), True)
187
189
        self.assertEquals(cs.get('b').read(), 'goodbye')
188
190
        self.assertEquals(s.get('b').read(), 'goodbye')
189
 
 
 
191
        
190
192
        self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
191
193
 
192
194
class MockTransport(transport.Transport):
240
242
 
241
243
 
242
244
class TestTransportStore(TestCase):
243
 
 
 
245
    
244
246
    def test__relpath_invalid(self):
245
247
        my_store = store.TransportStore(MockTransport())
246
248
        self.assertRaises(ValueError, my_store._relpath, '/foo')
297
299
        my_store.register_suffix('dsc')
298
300
        my_store.add(stream, "foo", 'dsc')
299
301
        self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
300
 
 
 
302
        
301
303
    def test_add_simple_suffixed(self):
302
304
        stream = StringIO("content")
303
305
        my_store = InstrumentedTransportStore(MockTransport(), True)
317
319
        stream = StringIO("signature for missing base")
318
320
        my_store.add(stream, "missing", 'sig')
319
321
        return my_store
320
 
 
 
322
        
321
323
    def test_has_simple(self):
322
324
        my_store = self.get_populated_store()
323
325
        self.assertEqual(True, my_store.has_id('foo'))
397
399
        my_store = store.TransportStore(MemoryTransport())
398
400
        self.assertEqual('%25', my_store._relpath('%'))
399
401
 
400
 
    def test_escaped_uppercase(self):
401
 
        """Uppercase letters are escaped for safety on Windows"""
402
 
        my_store = store.TransportStore(MemoryTransport(), prefixed=True,
403
 
            escaped=True)
404
 
        # a particularly perverse file-id! :-)
405
 
        self.assertEquals(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e')
406
 
 
407
402
 
408
403
class TestVersionFileStore(TestCaseWithTransport):
409
404
 
410
 
    def get_scope(self):
411
 
        return self._transaction
412
 
 
413
405
    def setUp(self):
414
406
        super(TestVersionFileStore, self).setUp()
415
407
        self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
416
 
        self.vfstore.get_scope = self.get_scope
417
 
        self._transaction = None
418
408
 
419
409
    def test_get_weave_registers_dirty_in_write(self):
420
 
        self._transaction = transactions.WriteTransaction()
421
 
        vf = self.vfstore.get_weave_or_empty('id', self._transaction)
422
 
        self._transaction.finish()
423
 
        self._transaction = None
424
 
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
425
 
        self._transaction = transactions.WriteTransaction()
426
 
        vf = self.vfstore.get_weave('id', self._transaction)
427
 
        self._transaction.finish()
428
 
        self._transaction = None
429
 
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
 
410
        transaction = transactions.WriteTransaction()
 
411
        vf = self.vfstore.get_weave_or_empty('id', transaction)
 
412
        transaction.finish()
 
413
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
 
414
        transaction = transactions.WriteTransaction()
 
415
        vf = self.vfstore.get_weave('id', transaction)
 
416
        transaction.finish()
 
417
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
 
418
 
 
419
    def test_get_weave_or_empty_readonly_fails(self):
 
420
        transaction = transactions.ReadOnlyTransaction()
 
421
        vf = self.assertRaises(errors.ReadOnlyError,
 
422
                               self.vfstore.get_weave_or_empty,
 
423
                               'id',
 
424
                               transaction)
430
425
 
431
426
    def test_get_weave_readonly_cant_write(self):
432
 
        self._transaction = transactions.WriteTransaction()
433
 
        vf = self.vfstore.get_weave_or_empty('id', self._transaction)
434
 
        self._transaction.finish()
435
 
        self._transaction = transactions.ReadOnlyTransaction()
436
 
        vf = self.vfstore.get_weave_or_empty('id', self._transaction)
 
427
        transaction = transactions.WriteTransaction()
 
428
        vf = self.vfstore.get_weave_or_empty('id', transaction)
 
429
        transaction.finish()
 
430
        transaction = transactions.ReadOnlyTransaction()
 
431
        vf = self.vfstore.get_weave_or_empty('id', transaction)
437
432
        self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
438
433
 
439
 
    def test___iter__escaped(self):
440
 
        self.vfstore = store.versioned.VersionedFileStore(MemoryTransport(),
441
 
            prefixed=True, escaped=True)
442
 
        self.vfstore.get_scope = self.get_scope
443
 
        self._transaction = transactions.WriteTransaction()
444
 
        vf = self.vfstore.get_weave_or_empty(' ', self._transaction)
445
 
        vf.add_lines('a', [], [])
446
 
        del vf
447
 
        self._transaction.finish()
448
 
        self.assertEqual([' '], list(self.vfstore))