~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_store.py

  • Committer: Patch Queue Manager
  • Date: 2012-09-28 07:22:23 UTC
  • mfrom: (6566.1.1 revno-error)
  • Revision ID: pqm@pqm.ubuntu.com-20120928072223-m8dp02j933yp9j60
(vila) Fix typo in an error message from the 'revno' command. (Matthew
 Fuller)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Development Ltd
2
 
 
 
1
# Copyright (C) 2005-2009, 2011 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Test Store implementations."""
18
18
 
20
20
import os
21
21
import gzip
22
22
 
23
 
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
24
 
from bzrlib.store import copy_all
25
 
from bzrlib.transport.local import LocalTransport
 
23
import bzrlib.errors as errors
 
24
from bzrlib.errors import BzrError
 
25
from bzrlib.store import TransportStore
26
26
from bzrlib.store.text import TextStore
27
 
from bzrlib.tests import TestCase, TestCaseInTempDir
28
 
import bzrlib.store as store
 
27
from bzrlib.store.versioned import VersionedFileStore
 
28
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
 
29
import bzrlib.transactions as transactions
29
30
import bzrlib.transport as transport
30
31
from bzrlib.transport.memory import MemoryTransport
 
32
from bzrlib.weave import WeaveFile
31
33
 
32
34
 
33
35
class TestStores(object):
47
49
        """Test copying"""
48
50
        os.mkdir('a')
49
51
        store_a = self.get_store('a')
50
 
        store_a.add('foo', '1')
 
52
        store_a.add(StringIO('foo'), '1')
51
53
        os.mkdir('b')
52
54
        store_b = self.get_store('b')
53
 
        copy_all(store_a, store_b)
 
55
        store_b.copy_all_ids(store_a)
54
56
        self.assertEqual(store_a.get('1').read(), 'foo')
55
57
        self.assertEqual(store_b.get('1').read(), 'foo')
56
58
        # TODO: Switch the exception form UnlistableStore to
62
64
    def test_get(self):
63
65
        store = self.get_store()
64
66
        self.fill_store(store)
65
 
    
 
67
 
66
68
        self.check_content(store, 'a', 'hello')
67
69
        self.check_content(store, 'b', 'other')
68
70
        self.check_content(store, 'c', 'something')
69
 
    
 
71
 
70
72
        # Make sure that requesting a non-existing file fails
71
73
        self.assertRaises(KeyError, self.check_content, store, 'd', None)
72
74
 
80
82
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
81
83
 
82
84
    def get_store(self, path=u'.'):
83
 
        t = LocalTransport(path)
 
85
        t = transport.get_transport_from_path(path)
84
86
        return TextStore(t, compressed=True)
85
87
 
86
88
    def test_total_size(self):
90
92
        store.add(StringIO('goodbye2'), '123123', 'dsc')
91
93
        # these get gzipped - content should be stable
92
94
        self.assertEqual(store.total_size(), (2, 55))
93
 
        
 
95
 
94
96
    def test__relpath_suffixed(self):
95
 
        my_store = TextStore(MockTransport(), True, compressed=True)
 
97
        my_store = TextStore(MockTransport(),
 
98
                             prefixed=True, compressed=True)
96
99
        my_store.register_suffix('dsc')
97
100
        self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
98
101
 
99
102
 
100
103
class TestMemoryStore(TestCase):
101
 
    
 
104
 
102
105
    def get_store(self):
103
 
        return store.ImmutableMemoryStore()
104
 
    
105
 
    def test_imports(self):
106
 
        from bzrlib.store import ImmutableMemoryStore
 
106
        return TextStore(MemoryTransport())
107
107
 
108
108
    def test_add_and_retrieve(self):
109
109
        store = self.get_store()
116
116
 
117
117
    def test_missing_is_absent(self):
118
118
        store = self.get_store()
119
 
        self.failIf('aa' in store)
 
119
        self.assertFalse('aa' in store)
120
120
 
121
121
    def test_adding_fails_when_present(self):
122
122
        my_store = self.get_store()
139
139
class TestTextStore(TestCaseInTempDir, TestStores):
140
140
 
141
141
    def get_store(self, path=u'.'):
142
 
        t = LocalTransport(path)
 
142
        t = transport.get_transport_from_path(path)
143
143
        return TextStore(t, compressed=False)
144
144
 
145
145
    def test_total_size(self):
157
157
class TestMixedTextStore(TestCaseInTempDir, TestStores):
158
158
 
159
159
    def get_store(self, path=u'.', compressed=True):
160
 
        t = LocalTransport(path)
 
160
        t = transport.get_transport_from_path(path)
161
161
        return TextStore(t, compressed=compressed)
162
162
 
163
163
    def test_get_mixed(self):
165
165
        s = self.get_store(u'.', compressed=False)
166
166
        cs.add(StringIO('hello there'), 'a')
167
167
 
168
 
        self.failUnlessExists('a.gz')
169
 
        self.failIf(os.path.lexists('a'))
 
168
        self.assertPathExists('a.gz')
 
169
        self.assertFalse(os.path.lexists('a'))
170
170
 
171
171
        self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
172
172
 
174
174
        self.assertEquals(s.has_id('a'), True)
175
175
        self.assertEquals(cs.get('a').read(), 'hello there')
176
176
        self.assertEquals(s.get('a').read(), 'hello there')
177
 
        
 
177
 
178
178
        self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
179
179
 
180
180
        s.add(StringIO('goodbye'), 'b')
181
 
        self.failUnlessExists('b')
182
 
        self.failIf(os.path.lexists('b.gz'))
 
181
        self.assertPathExists('b')
 
182
        self.assertFalse(os.path.lexists('b.gz'))
183
183
        self.assertEquals(open('b').read(), 'goodbye')
184
184
 
185
185
        self.assertEquals(cs.has_id('b'), True)
186
186
        self.assertEquals(s.has_id('b'), True)
187
187
        self.assertEquals(cs.get('b').read(), 'goodbye')
188
188
        self.assertEquals(s.get('b').read(), 'goodbye')
189
 
        
 
189
 
190
190
        self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
191
191
 
192
192
class MockTransport(transport.Transport):
204
204
        return
205
205
 
206
206
 
207
 
class InstrumentedTransportStore(store.TransportStore):
 
207
class InstrumentedTransportStore(TransportStore):
208
208
    """An instrumented TransportStore.
209
209
 
210
210
    Here we replace template method worker methods with calls that record the
230
230
class TestMockTransport(TestCase):
231
231
 
232
232
    def test_isinstance(self):
233
 
        self.failUnless(isinstance(MockTransport(), transport.Transport))
 
233
        self.assertIsInstance(MockTransport(), transport.Transport)
234
234
 
235
235
    def test_has(self):
236
236
        self.assertEqual(False, MockTransport().has('foo'))
240
240
 
241
241
 
242
242
class TestTransportStore(TestCase):
243
 
    
 
243
 
244
244
    def test__relpath_invalid(self):
245
 
        my_store = store.TransportStore(MockTransport())
 
245
        my_store = TransportStore(MockTransport())
246
246
        self.assertRaises(ValueError, my_store._relpath, '/foo')
247
247
        self.assertRaises(ValueError, my_store._relpath, 'foo/')
248
248
 
249
249
    def test_register_invalid_suffixes(self):
250
 
        my_store = store.TransportStore(MockTransport())
 
250
        my_store = TransportStore(MockTransport())
251
251
        self.assertRaises(ValueError, my_store.register_suffix, '/')
252
252
        self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
253
253
 
254
254
    def test__relpath_unregister_suffixes(self):
255
 
        my_store = store.TransportStore(MockTransport())
 
255
        my_store = TransportStore(MockTransport())
256
256
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
257
257
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
258
258
 
259
259
    def test__relpath_simple(self):
260
 
        my_store = store.TransportStore(MockTransport())
 
260
        my_store = TransportStore(MockTransport())
261
261
        self.assertEqual("foo", my_store._relpath('foo'))
262
262
 
263
263
    def test__relpath_prefixed(self):
264
 
        my_store = store.TransportStore(MockTransport(), True)
 
264
        my_store = TransportStore(MockTransport(), True)
265
265
        self.assertEqual('45/foo', my_store._relpath('foo'))
266
266
 
267
267
    def test__relpath_simple_suffixed(self):
268
 
        my_store = store.TransportStore(MockTransport())
 
268
        my_store = TransportStore(MockTransport())
269
269
        my_store.register_suffix('bar')
270
270
        my_store.register_suffix('baz')
271
271
        self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
272
272
        self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
273
273
 
274
274
    def test__relpath_prefixed_suffixed(self):
275
 
        my_store = store.TransportStore(MockTransport(), True)
 
275
        my_store = TransportStore(MockTransport(), True)
276
276
        my_store.register_suffix('bar')
277
277
        my_store.register_suffix('baz')
278
278
        self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
297
297
        my_store.register_suffix('dsc')
298
298
        my_store.add(stream, "foo", 'dsc')
299
299
        self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
300
 
        
 
300
 
301
301
    def test_add_simple_suffixed(self):
302
302
        stream = StringIO("content")
303
303
        my_store = InstrumentedTransportStore(MockTransport(), True)
317
317
        stream = StringIO("signature for missing base")
318
318
        my_store.add(stream, "missing", 'sig')
319
319
        return my_store
320
 
        
 
320
 
321
321
    def test_has_simple(self):
322
322
        my_store = self.get_populated_store()
323
323
        self.assertEqual(True, my_store.has_id('foo'))
357
357
                         my_store.get('missing', 'sig').read())
358
358
 
359
359
    def test___iter__no_suffix(self):
360
 
        my_store = TextStore(MemoryTransport(), False, compressed=False)
 
360
        my_store = TextStore(MemoryTransport(),
 
361
                             prefixed=False, compressed=False)
361
362
        stream = StringIO("content")
362
363
        my_store.add(stream, "foo")
363
364
        self.assertEqual(set(['foo']),
382
383
 
383
384
    def test_copy_suffixes(self):
384
385
        from_store = self.get_populated_store()
385
 
        to_store = TextStore(MemoryTransport(), True, compressed=True)
 
386
        to_store = TextStore(MemoryTransport(),
 
387
                             prefixed=True, compressed=True)
386
388
        to_store.register_suffix('sig')
387
 
        copy_all(from_store, to_store)
 
389
        to_store.copy_all_ids(from_store)
388
390
        self.assertEqual(1, len(to_store))
389
391
        self.assertEqual(set(['foo']), set(to_store.__iter__()))
390
392
        self.assertEqual('content', to_store.get('foo').read())
392
394
        self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
393
395
 
394
396
    def test_relpath_escaped(self):
395
 
        my_store = store.TransportStore(MemoryTransport())
 
397
        my_store = TransportStore(MemoryTransport())
396
398
        self.assertEqual('%25', my_store._relpath('%'))
 
399
 
 
400
    def test_escaped_uppercase(self):
 
401
        """Uppercase letters are escaped for safety on Windows"""
 
402
        my_store = TransportStore(MemoryTransport(), prefixed=True,
 
403
            escaped=True)
 
404
        # a particularly perverse file-id! :-)
 
405
        self.assertEquals(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e')
 
406
 
 
407
 
 
408
class TestVersionFileStore(TestCaseWithTransport):
 
409
 
 
410
    def get_scope(self):
 
411
        return self._transaction
 
412
 
 
413
    def setUp(self):
 
414
        super(TestVersionFileStore, self).setUp()
 
415
        self.vfstore = VersionedFileStore(MemoryTransport(),
 
416
            versionedfile_class=WeaveFile)
 
417
        self.vfstore.get_scope = self.get_scope
 
418
        self._transaction = None
 
419
 
 
420
    def test_get_weave_registers_dirty_in_write(self):
 
421
        self._transaction = transactions.WriteTransaction()
 
422
        vf = self.vfstore.get_weave_or_empty('id', self._transaction)
 
423
        self._transaction.finish()
 
424
        self._transaction = None
 
425
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
 
426
        self._transaction = transactions.WriteTransaction()
 
427
        vf = self.vfstore.get_weave('id', self._transaction)
 
428
        self._transaction.finish()
 
429
        self._transaction = None
 
430
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
 
431
 
 
432
    def test_get_weave_readonly_cant_write(self):
 
433
        self._transaction = transactions.WriteTransaction()
 
434
        vf = self.vfstore.get_weave_or_empty('id', self._transaction)
 
435
        self._transaction.finish()
 
436
        self._transaction = transactions.ReadOnlyTransaction()
 
437
        vf = self.vfstore.get_weave_or_empty('id', self._transaction)
 
438
        self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
 
439
 
 
440
    def test___iter__escaped(self):
 
441
        self.vfstore = VersionedFileStore(MemoryTransport(),
 
442
            prefixed=True, escaped=True, versionedfile_class=WeaveFile)
 
443
        self.vfstore.get_scope = self.get_scope
 
444
        self._transaction = transactions.WriteTransaction()
 
445
        vf = self.vfstore.get_weave_or_empty(' ', self._transaction)
 
446
        vf.add_lines('a', [], [])
 
447
        del vf
 
448
        self._transaction.finish()
 
449
        self.assertEqual([' '], list(self.vfstore))