~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_store.py

  • Committer: Patch Queue Manager
  • Date: 2016-04-21 04:10:52 UTC
  • mfrom: (6616.1.1 fix-en-user-guide)
  • Revision ID: pqm@pqm.ubuntu.com-20160421041052-clcye7ns1qcl2n7w
(richard-wilbur) Ensure build of English use guide always uses English text
 even when user's locale specifies a different language. (Jelmer Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Development Ltd
2
 
 
 
1
# Copyright (C) 2005-2011, 2016 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Test Store implementations."""
18
18
 
19
19
from cStringIO import StringIO
20
20
import os
 
21
import gzip
21
22
 
22
 
from bzrlib.errors import BzrError, UnlistableStore
23
 
from bzrlib.store import copy_all
24
 
from bzrlib.transport.local import LocalTransport
25
 
from bzrlib.transport import NoSuchFile
26
 
from bzrlib.store.compressed_text import CompressedTextStore
 
23
import bzrlib.errors as errors
 
24
from bzrlib.errors import BzrError
 
25
from bzrlib.store import TransportStore
27
26
from bzrlib.store.text import TextStore
28
 
from bzrlib.selftest import TestCase, TestCaseInTempDir
29
 
import bzrlib.store as store
 
27
from bzrlib.store.versioned import VersionedFileStore
 
28
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
 
29
import bzrlib.transactions as transactions
30
30
import bzrlib.transport as transport
31
31
from bzrlib.transport.memory import MemoryTransport
 
32
from bzrlib.weave import WeaveFile
32
33
 
33
34
 
34
35
class TestStores(object):
 
36
    """Mixin template class that provides some common tests for stores"""
35
37
 
36
38
    def check_content(self, store, fileid, value):
37
39
        f = store.get(fileid)
47
49
        """Test copying"""
48
50
        os.mkdir('a')
49
51
        store_a = self.get_store('a')
50
 
        store_a.add('foo', '1')
 
52
        store_a.add(StringIO('foo'), '1')
51
53
        os.mkdir('b')
52
54
        store_b = self.get_store('b')
53
 
        copy_all(store_a, store_b)
 
55
        store_b.copy_all_ids(store_a)
54
56
        self.assertEqual(store_a.get('1').read(), 'foo')
55
57
        self.assertEqual(store_b.get('1').read(), 'foo')
56
58
        # TODO: Switch the exception form UnlistableStore to
62
64
    def test_get(self):
63
65
        store = self.get_store()
64
66
        self.fill_store(store)
65
 
    
 
67
 
66
68
        self.check_content(store, 'a', 'hello')
67
69
        self.check_content(store, 'b', 'other')
68
70
        self.check_content(store, 'c', 'something')
69
 
    
 
71
 
70
72
        # Make sure that requesting a non-existing file fails
71
73
        self.assertRaises(KeyError, self.check_content, store, 'd', None)
72
74
 
79
81
 
80
82
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
81
83
 
82
 
    def get_store(self, path='.'):
83
 
        t = LocalTransport(path)
84
 
        return CompressedTextStore(t)
 
84
    def get_store(self, path=u'.'):
 
85
        t = transport.get_transport_from_path(path)
 
86
        return TextStore(t, compressed=True)
85
87
 
86
88
    def test_total_size(self):
87
 
        store = self.get_store('.')
 
89
        store = self.get_store(u'.')
88
90
        store.register_suffix('dsc')
89
91
        store.add(StringIO('goodbye'), '123123')
90
92
        store.add(StringIO('goodbye2'), '123123', 'dsc')
91
93
        # these get gzipped - content should be stable
92
94
        self.assertEqual(store.total_size(), (2, 55))
93
 
        
 
95
 
94
96
    def test__relpath_suffixed(self):
95
 
        my_store = CompressedTextStore(MockTransport(), True)
 
97
        my_store = TextStore(MockTransport(),
 
98
                             prefixed=True, compressed=True)
96
99
        my_store.register_suffix('dsc')
97
 
        self.assertEqual('45/foo.dsc.gz', my_store._relpath('foo', ['dsc']))
 
100
        self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
98
101
 
99
102
 
100
103
class TestMemoryStore(TestCase):
101
 
    
 
104
 
102
105
    def get_store(self):
103
 
        return store.ImmutableMemoryStore()
104
 
    
105
 
    def test_imports(self):
106
 
        from bzrlib.store import ImmutableMemoryStore
 
106
        return TextStore(MemoryTransport())
107
107
 
108
108
    def test_add_and_retrieve(self):
109
109
        store = self.get_store()
116
116
 
117
117
    def test_missing_is_absent(self):
118
118
        store = self.get_store()
119
 
        self.failIf('aa' in store)
 
119
        self.assertFalse('aa' in store)
120
120
 
121
121
    def test_adding_fails_when_present(self):
122
122
        my_store = self.get_store()
138
138
 
139
139
class TestTextStore(TestCaseInTempDir, TestStores):
140
140
 
141
 
    def get_store(self, path='.'):
142
 
        t = LocalTransport(path)
143
 
        return TextStore(t)
 
141
    def get_store(self, path=u'.'):
 
142
        t = transport.get_transport_from_path(path)
 
143
        return TextStore(t, compressed=False)
144
144
 
145
145
    def test_total_size(self):
146
146
        store = self.get_store()
154
154
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
155
155
 
156
156
 
 
157
class TestMixedTextStore(TestCaseInTempDir, TestStores):
 
158
 
 
159
    def get_store(self, path=u'.', compressed=True):
 
160
        t = transport.get_transport_from_path(path)
 
161
        return TextStore(t, compressed=compressed)
 
162
 
 
163
    def test_get_mixed(self):
 
164
        cs = self.get_store(u'.', compressed=True)
 
165
        s = self.get_store(u'.', compressed=False)
 
166
        cs.add(StringIO('hello there'), 'a')
 
167
 
 
168
        self.assertPathExists('a.gz')
 
169
        self.assertFalse(os.path.lexists('a'))
 
170
 
 
171
        self.assertEqual(gzip.GzipFile('a.gz').read(), 'hello there')
 
172
 
 
173
        self.assertEqual(cs.has_id('a'), True)
 
174
        self.assertEqual(s.has_id('a'), True)
 
175
        self.assertEqual(cs.get('a').read(), 'hello there')
 
176
        self.assertEqual(s.get('a').read(), 'hello there')
 
177
 
 
178
        self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
 
179
 
 
180
        s.add(StringIO('goodbye'), 'b')
 
181
        self.assertPathExists('b')
 
182
        self.assertFalse(os.path.lexists('b.gz'))
 
183
        self.assertEqual(open('b').read(), 'goodbye')
 
184
 
 
185
        self.assertEqual(cs.has_id('b'), True)
 
186
        self.assertEqual(s.has_id('b'), True)
 
187
        self.assertEqual(cs.get('b').read(), 'goodbye')
 
188
        self.assertEqual(s.get('b').read(), 'goodbye')
 
189
 
 
190
        self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
 
191
 
157
192
class MockTransport(transport.Transport):
158
193
    """A fake transport for testing with."""
159
194
 
169
204
        return
170
205
 
171
206
 
172
 
class InstrumentedTransportStore(store.TransportStore):
 
207
class InstrumentedTransportStore(TransportStore):
173
208
    """An instrumented TransportStore.
174
209
 
175
210
    Here we replace template method worker methods with calls that record the
195
230
class TestMockTransport(TestCase):
196
231
 
197
232
    def test_isinstance(self):
198
 
        self.failUnless(isinstance(MockTransport(), transport.Transport))
 
233
        self.assertIsInstance(MockTransport(), transport.Transport)
199
234
 
200
235
    def test_has(self):
201
236
        self.assertEqual(False, MockTransport().has('foo'))
205
240
 
206
241
 
207
242
class TestTransportStore(TestCase):
208
 
    
 
243
 
209
244
    def test__relpath_invalid(self):
210
 
        my_store = store.TransportStore(MockTransport())
 
245
        my_store = TransportStore(MockTransport())
211
246
        self.assertRaises(ValueError, my_store._relpath, '/foo')
212
247
        self.assertRaises(ValueError, my_store._relpath, 'foo/')
213
248
 
214
249
    def test_register_invalid_suffixes(self):
215
 
        my_store = store.TransportStore(MockTransport())
 
250
        my_store = TransportStore(MockTransport())
216
251
        self.assertRaises(ValueError, my_store.register_suffix, '/')
217
252
        self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
218
253
 
219
254
    def test__relpath_unregister_suffixes(self):
220
 
        my_store = store.TransportStore(MockTransport())
 
255
        my_store = TransportStore(MockTransport())
221
256
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
222
257
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
223
258
 
224
259
    def test__relpath_simple(self):
225
 
        my_store = store.TransportStore(MockTransport())
 
260
        my_store = TransportStore(MockTransport())
226
261
        self.assertEqual("foo", my_store._relpath('foo'))
227
262
 
228
263
    def test__relpath_prefixed(self):
229
 
        my_store = store.TransportStore(MockTransport(), True)
 
264
        my_store = TransportStore(MockTransport(), True)
230
265
        self.assertEqual('45/foo', my_store._relpath('foo'))
231
266
 
232
267
    def test__relpath_simple_suffixed(self):
233
 
        my_store = store.TransportStore(MockTransport())
234
 
        my_store.register_suffix('gz')
 
268
        my_store = TransportStore(MockTransport())
235
269
        my_store.register_suffix('bar')
236
 
        self.assertEqual('foo.gz', my_store._relpath('foo', ['gz']))
237
 
        self.assertEqual('foo.gz.bar', my_store._relpath('foo', ['gz', 'bar']))
 
270
        my_store.register_suffix('baz')
 
271
        self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
 
272
        self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
238
273
 
239
274
    def test__relpath_prefixed_suffixed(self):
240
 
        my_store = store.TransportStore(MockTransport(), True)
241
 
        my_store.register_suffix('gz')
 
275
        my_store = TransportStore(MockTransport(), True)
242
276
        my_store.register_suffix('bar')
243
 
        self.assertEqual('45/foo.gz', my_store._relpath('foo', ['gz']))
244
 
        self.assertEqual('45/foo.gz.bar',
245
 
                         my_store._relpath('foo', ['gz', 'bar']))
 
277
        my_store.register_suffix('baz')
 
278
        self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
 
279
        self.assertEqual('45/foo.bar.baz',
 
280
                         my_store._relpath('foo', ['bar', 'baz']))
246
281
 
247
282
    def test_add_simple(self):
248
283
        stream = StringIO("content")
262
297
        my_store.register_suffix('dsc')
263
298
        my_store.add(stream, "foo", 'dsc')
264
299
        self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
265
 
        
 
300
 
266
301
    def test_add_simple_suffixed(self):
267
302
        stream = StringIO("content")
268
303
        my_store = InstrumentedTransportStore(MockTransport(), True)
270
305
        my_store.add(stream, "foo", 'dsc')
271
306
        self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
272
307
 
273
 
    def get_populated_store(self, prefixed=False, store_class=TextStore):
274
 
        my_store = store_class(MemoryTransport(), prefixed)
 
308
    def get_populated_store(self, prefixed=False,
 
309
            store_class=TextStore, compressed=False):
 
310
        my_store = store_class(MemoryTransport(), prefixed,
 
311
                               compressed=compressed)
275
312
        my_store.register_suffix('sig')
276
313
        stream = StringIO("signature")
277
314
        my_store.add(stream, "foo", 'sig')
280
317
        stream = StringIO("signature for missing base")
281
318
        my_store.add(stream, "missing", 'sig')
282
319
        return my_store
283
 
        
 
320
 
284
321
    def test_has_simple(self):
285
322
        my_store = self.get_populated_store()
286
323
        self.assertEqual(True, my_store.has_id('foo'))
320
357
                         my_store.get('missing', 'sig').read())
321
358
 
322
359
    def test___iter__no_suffix(self):
323
 
        my_store = TextStore(MemoryTransport(), False)
 
360
        my_store = TextStore(MemoryTransport(),
 
361
                             prefixed=False, compressed=False)
324
362
        stream = StringIO("content")
325
363
        my_store.add(stream, "foo")
326
364
        self.assertEqual(set(['foo']),
335
373
    def test___iter__compressed(self):
336
374
        self.assertEqual(set(['foo']),
337
375
                         set(self.get_populated_store(
338
 
                             store_class=CompressedTextStore).__iter__()))
 
376
                             compressed=True).__iter__()))
339
377
        self.assertEqual(set(['foo']),
340
378
                         set(self.get_populated_store(
341
 
                             True, CompressedTextStore).__iter__()))
 
379
                             True, compressed=True).__iter__()))
342
380
 
343
381
    def test___len__(self):
344
382
        self.assertEqual(1, len(self.get_populated_store()))
345
383
 
346
384
    def test_copy_suffixes(self):
347
385
        from_store = self.get_populated_store()
348
 
        to_store = CompressedTextStore(MemoryTransport(), True)
 
386
        to_store = TextStore(MemoryTransport(),
 
387
                             prefixed=True, compressed=True)
349
388
        to_store.register_suffix('sig')
350
 
        copy_all(from_store, to_store)
 
389
        to_store.copy_all_ids(from_store)
351
390
        self.assertEqual(1, len(to_store))
352
391
        self.assertEqual(set(['foo']), set(to_store.__iter__()))
353
392
        self.assertEqual('content', to_store.get('foo').read())
354
393
        self.assertEqual('signature', to_store.get('foo', 'sig').read())
355
394
        self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
 
395
 
 
396
    def test_relpath_escaped(self):
 
397
        my_store = TransportStore(MemoryTransport())
 
398
        self.assertEqual('%25', my_store._relpath('%'))
 
399
 
 
400
    def test_escaped_uppercase(self):
 
401
        """Uppercase letters are escaped for safety on Windows"""
 
402
        my_store = TransportStore(MemoryTransport(), prefixed=True,
 
403
            escaped=True)
 
404
        # a particularly perverse file-id! :-)
 
405
        self.assertEqual(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e')
 
406
 
 
407
 
 
408
class TestVersionFileStore(TestCaseWithTransport):
 
409
 
 
410
    def get_scope(self):
 
411
        return self._transaction
 
412
 
 
413
    def setUp(self):
 
414
        super(TestVersionFileStore, self).setUp()
 
415
        self.vfstore = VersionedFileStore(MemoryTransport(),
 
416
            versionedfile_class=WeaveFile)
 
417
        self.vfstore.get_scope = self.get_scope
 
418
        self._transaction = None
 
419
 
 
420
    def test_get_weave_registers_dirty_in_write(self):
 
421
        self._transaction = transactions.WriteTransaction()
 
422
        vf = self.vfstore.get_weave_or_empty('id', self._transaction)
 
423
        self._transaction.finish()
 
424
        self._transaction = None
 
425
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
 
426
        self._transaction = transactions.WriteTransaction()
 
427
        vf = self.vfstore.get_weave('id', self._transaction)
 
428
        self._transaction.finish()
 
429
        self._transaction = None
 
430
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
 
431
 
 
432
    def test_get_weave_readonly_cant_write(self):
 
433
        self._transaction = transactions.WriteTransaction()
 
434
        vf = self.vfstore.get_weave_or_empty('id', self._transaction)
 
435
        self._transaction.finish()
 
436
        self._transaction = transactions.ReadOnlyTransaction()
 
437
        vf = self.vfstore.get_weave_or_empty('id', self._transaction)
 
438
        self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
 
439
 
 
440
    def test___iter__escaped(self):
 
441
        self.vfstore = VersionedFileStore(MemoryTransport(),
 
442
            prefixed=True, escaped=True, versionedfile_class=WeaveFile)
 
443
        self.vfstore.get_scope = self.get_scope
 
444
        self._transaction = transactions.WriteTransaction()
 
445
        vf = self.vfstore.get_weave_or_empty(' ', self._transaction)
 
446
        vf.add_lines('a', [], [])
 
447
        del vf
 
448
        self._transaction.finish()
 
449
        self.assertEqual([' '], list(self.vfstore))