~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_store.py

  • Committer: Patch Queue Manager
  • Date: 2011-09-22 14:12:18 UTC
  • mfrom: (6155.3.1 jam)
  • Revision ID: pqm@pqm.ubuntu.com-20110922141218-86s4uu6nqvourw4f
(jameinel) Cleanup comments bzrlib/smart/__init__.py (John A Meinel)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Development Ltd
2
 
 
 
1
# Copyright (C) 2005-2009, 2011 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Test Store implementations."""
18
18
 
21
21
import gzip
22
22
 
23
23
import bzrlib.errors as errors
24
 
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
25
 
from bzrlib.transport.local import LocalTransport
 
24
from bzrlib.errors import BzrError
 
25
from bzrlib.store import TransportStore
26
26
from bzrlib.store.text import TextStore
 
27
from bzrlib.store.versioned import VersionedFileStore
27
28
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
28
 
import bzrlib.store as store
29
29
import bzrlib.transactions as transactions
30
30
import bzrlib.transport as transport
31
31
from bzrlib.transport.memory import MemoryTransport
 
32
from bzrlib.weave import WeaveFile
32
33
 
33
34
 
34
35
class TestStores(object):
48
49
        """Test copying"""
49
50
        os.mkdir('a')
50
51
        store_a = self.get_store('a')
51
 
        store_a.add('foo', '1')
 
52
        store_a.add(StringIO('foo'), '1')
52
53
        os.mkdir('b')
53
54
        store_b = self.get_store('b')
54
55
        store_b.copy_all_ids(store_a)
63
64
    def test_get(self):
64
65
        store = self.get_store()
65
66
        self.fill_store(store)
66
 
    
 
67
 
67
68
        self.check_content(store, 'a', 'hello')
68
69
        self.check_content(store, 'b', 'other')
69
70
        self.check_content(store, 'c', 'something')
70
 
    
 
71
 
71
72
        # Make sure that requesting a non-existing file fails
72
73
        self.assertRaises(KeyError, self.check_content, store, 'd', None)
73
74
 
81
82
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
82
83
 
83
84
    def get_store(self, path=u'.'):
84
 
        t = transport.get_transport(path)
 
85
        t = transport.get_transport_from_path(path)
85
86
        return TextStore(t, compressed=True)
86
87
 
87
88
    def test_total_size(self):
91
92
        store.add(StringIO('goodbye2'), '123123', 'dsc')
92
93
        # these get gzipped - content should be stable
93
94
        self.assertEqual(store.total_size(), (2, 55))
94
 
        
 
95
 
95
96
    def test__relpath_suffixed(self):
96
97
        my_store = TextStore(MockTransport(),
97
98
                             prefixed=True, compressed=True)
100
101
 
101
102
 
102
103
class TestMemoryStore(TestCase):
103
 
    
 
104
 
104
105
    def get_store(self):
105
106
        return TextStore(MemoryTransport())
106
 
    
 
107
 
107
108
    def test_add_and_retrieve(self):
108
109
        store = self.get_store()
109
110
        store.add(StringIO('hello'), 'aa')
115
116
 
116
117
    def test_missing_is_absent(self):
117
118
        store = self.get_store()
118
 
        self.failIf('aa' in store)
 
119
        self.assertFalse('aa' in store)
119
120
 
120
121
    def test_adding_fails_when_present(self):
121
122
        my_store = self.get_store()
138
139
class TestTextStore(TestCaseInTempDir, TestStores):
139
140
 
140
141
    def get_store(self, path=u'.'):
141
 
        t = transport.get_transport(path)
 
142
        t = transport.get_transport_from_path(path)
142
143
        return TextStore(t, compressed=False)
143
144
 
144
145
    def test_total_size(self):
156
157
class TestMixedTextStore(TestCaseInTempDir, TestStores):
157
158
 
158
159
    def get_store(self, path=u'.', compressed=True):
159
 
        t = transport.get_transport(path)
 
160
        t = transport.get_transport_from_path(path)
160
161
        return TextStore(t, compressed=compressed)
161
162
 
162
163
    def test_get_mixed(self):
164
165
        s = self.get_store(u'.', compressed=False)
165
166
        cs.add(StringIO('hello there'), 'a')
166
167
 
167
 
        self.failUnlessExists('a.gz')
168
 
        self.failIf(os.path.lexists('a'))
 
168
        self.assertPathExists('a.gz')
 
169
        self.assertFalse(os.path.lexists('a'))
169
170
 
170
171
        self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
171
172
 
173
174
        self.assertEquals(s.has_id('a'), True)
174
175
        self.assertEquals(cs.get('a').read(), 'hello there')
175
176
        self.assertEquals(s.get('a').read(), 'hello there')
176
 
        
 
177
 
177
178
        self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
178
179
 
179
180
        s.add(StringIO('goodbye'), 'b')
180
 
        self.failUnlessExists('b')
181
 
        self.failIf(os.path.lexists('b.gz'))
 
181
        self.assertPathExists('b')
 
182
        self.assertFalse(os.path.lexists('b.gz'))
182
183
        self.assertEquals(open('b').read(), 'goodbye')
183
184
 
184
185
        self.assertEquals(cs.has_id('b'), True)
185
186
        self.assertEquals(s.has_id('b'), True)
186
187
        self.assertEquals(cs.get('b').read(), 'goodbye')
187
188
        self.assertEquals(s.get('b').read(), 'goodbye')
188
 
        
 
189
 
189
190
        self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
190
191
 
191
192
class MockTransport(transport.Transport):
203
204
        return
204
205
 
205
206
 
206
 
class InstrumentedTransportStore(store.TransportStore):
 
207
class InstrumentedTransportStore(TransportStore):
207
208
    """An instrumented TransportStore.
208
209
 
209
210
    Here we replace template method worker methods with calls that record the
229
230
class TestMockTransport(TestCase):
230
231
 
231
232
    def test_isinstance(self):
232
 
        self.failUnless(isinstance(MockTransport(), transport.Transport))
 
233
        self.assertIsInstance(MockTransport(), transport.Transport)
233
234
 
234
235
    def test_has(self):
235
236
        self.assertEqual(False, MockTransport().has('foo'))
239
240
 
240
241
 
241
242
class TestTransportStore(TestCase):
242
 
    
 
243
 
243
244
    def test__relpath_invalid(self):
244
 
        my_store = store.TransportStore(MockTransport())
 
245
        my_store = TransportStore(MockTransport())
245
246
        self.assertRaises(ValueError, my_store._relpath, '/foo')
246
247
        self.assertRaises(ValueError, my_store._relpath, 'foo/')
247
248
 
248
249
    def test_register_invalid_suffixes(self):
249
 
        my_store = store.TransportStore(MockTransport())
 
250
        my_store = TransportStore(MockTransport())
250
251
        self.assertRaises(ValueError, my_store.register_suffix, '/')
251
252
        self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
252
253
 
253
254
    def test__relpath_unregister_suffixes(self):
254
 
        my_store = store.TransportStore(MockTransport())
 
255
        my_store = TransportStore(MockTransport())
255
256
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
256
257
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
257
258
 
258
259
    def test__relpath_simple(self):
259
 
        my_store = store.TransportStore(MockTransport())
 
260
        my_store = TransportStore(MockTransport())
260
261
        self.assertEqual("foo", my_store._relpath('foo'))
261
262
 
262
263
    def test__relpath_prefixed(self):
263
 
        my_store = store.TransportStore(MockTransport(), True)
 
264
        my_store = TransportStore(MockTransport(), True)
264
265
        self.assertEqual('45/foo', my_store._relpath('foo'))
265
266
 
266
267
    def test__relpath_simple_suffixed(self):
267
 
        my_store = store.TransportStore(MockTransport())
 
268
        my_store = TransportStore(MockTransport())
268
269
        my_store.register_suffix('bar')
269
270
        my_store.register_suffix('baz')
270
271
        self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
271
272
        self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
272
273
 
273
274
    def test__relpath_prefixed_suffixed(self):
274
 
        my_store = store.TransportStore(MockTransport(), True)
 
275
        my_store = TransportStore(MockTransport(), True)
275
276
        my_store.register_suffix('bar')
276
277
        my_store.register_suffix('baz')
277
278
        self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
296
297
        my_store.register_suffix('dsc')
297
298
        my_store.add(stream, "foo", 'dsc')
298
299
        self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
299
 
        
 
300
 
300
301
    def test_add_simple_suffixed(self):
301
302
        stream = StringIO("content")
302
303
        my_store = InstrumentedTransportStore(MockTransport(), True)
316
317
        stream = StringIO("signature for missing base")
317
318
        my_store.add(stream, "missing", 'sig')
318
319
        return my_store
319
 
        
 
320
 
320
321
    def test_has_simple(self):
321
322
        my_store = self.get_populated_store()
322
323
        self.assertEqual(True, my_store.has_id('foo'))
393
394
        self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
394
395
 
395
396
    def test_relpath_escaped(self):
396
 
        my_store = store.TransportStore(MemoryTransport())
 
397
        my_store = TransportStore(MemoryTransport())
397
398
        self.assertEqual('%25', my_store._relpath('%'))
398
399
 
399
400
    def test_escaped_uppercase(self):
400
401
        """Uppercase letters are escaped for safety on Windows"""
401
 
        my_store = store.TransportStore(MemoryTransport(), escaped=True)
 
402
        my_store = TransportStore(MemoryTransport(), prefixed=True,
 
403
            escaped=True)
402
404
        # a particularly perverse file-id! :-)
403
 
        self.assertEquals(my_store._escape_file_id('C:<>'), '%43%3a%3c%3e')
 
405
        self.assertEquals(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e')
404
406
 
405
407
 
406
408
class TestVersionFileStore(TestCaseWithTransport):
407
409
 
 
410
    def get_scope(self):
 
411
        return self._transaction
 
412
 
408
413
    def setUp(self):
409
414
        super(TestVersionFileStore, self).setUp()
410
 
        self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
 
415
        self.vfstore = VersionedFileStore(MemoryTransport(),
 
416
            versionedfile_class=WeaveFile)
 
417
        self.vfstore.get_scope = self.get_scope
 
418
        self._transaction = None
411
419
 
412
420
    def test_get_weave_registers_dirty_in_write(self):
413
 
        transaction = transactions.WriteTransaction()
414
 
        vf = self.vfstore.get_weave_or_empty('id', transaction)
415
 
        transaction.finish()
416
 
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
417
 
        transaction = transactions.WriteTransaction()
418
 
        vf = self.vfstore.get_weave('id', transaction)
419
 
        transaction.finish()
420
 
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
421
 
 
422
 
    def test_get_weave_or_empty_readonly_fails(self):
423
 
        transaction = transactions.ReadOnlyTransaction()
424
 
        vf = self.assertRaises(errors.ReadOnlyError,
425
 
                               self.vfstore.get_weave_or_empty,
426
 
                               'id',
427
 
                               transaction)
 
421
        self._transaction = transactions.WriteTransaction()
 
422
        vf = self.vfstore.get_weave_or_empty('id', self._transaction)
 
423
        self._transaction.finish()
 
424
        self._transaction = None
 
425
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
 
426
        self._transaction = transactions.WriteTransaction()
 
427
        vf = self.vfstore.get_weave('id', self._transaction)
 
428
        self._transaction.finish()
 
429
        self._transaction = None
 
430
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
428
431
 
429
432
    def test_get_weave_readonly_cant_write(self):
430
 
        transaction = transactions.WriteTransaction()
431
 
        vf = self.vfstore.get_weave_or_empty('id', transaction)
432
 
        transaction.finish()
433
 
        transaction = transactions.ReadOnlyTransaction()
434
 
        vf = self.vfstore.get_weave_or_empty('id', transaction)
 
433
        self._transaction = transactions.WriteTransaction()
 
434
        vf = self.vfstore.get_weave_or_empty('id', self._transaction)
 
435
        self._transaction.finish()
 
436
        self._transaction = transactions.ReadOnlyTransaction()
 
437
        vf = self.vfstore.get_weave_or_empty('id', self._transaction)
435
438
        self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
436
439
 
 
440
    def test___iter__escaped(self):
 
441
        self.vfstore = VersionedFileStore(MemoryTransport(),
 
442
            prefixed=True, escaped=True, versionedfile_class=WeaveFile)
 
443
        self.vfstore.get_scope = self.get_scope
 
444
        self._transaction = transactions.WriteTransaction()
 
445
        vf = self.vfstore.get_weave_or_empty(' ', self._transaction)
 
446
        vf.add_lines('a', [], [])
 
447
        del vf
 
448
        self._transaction.finish()
 
449
        self.assertEqual([' '], list(self.vfstore))