1
# Copyright (C) 2005 Canonical Ltd
1
# Copyright (C) 2005 by Canonical Development Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
19
from cStringIO import StringIO
23
import bzrlib.errors as errors
24
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
22
from bzrlib.errors import BzrError, UnlistableStore
23
from bzrlib.store import copy_all
25
24
from bzrlib.transport.local import LocalTransport
25
from bzrlib.transport import NoSuchFile
26
from bzrlib.store.compressed_text import CompressedTextStore
26
27
from bzrlib.store.text import TextStore
27
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
28
from bzrlib.selftest import TestCase, TestCaseInTempDir
28
29
import bzrlib.store as store
29
import bzrlib.transactions as transactions
30
30
import bzrlib.transport as transport
31
31
from bzrlib.transport.memory import MemoryTransport
34
34
class TestStores(object):
35
"""Mixin template class that provides some common tests for stores"""
37
36
def check_content(self, store, fileid, value):
38
37
f = store.get(fileid)
39
38
self.assertEqual(f.read(), value)
41
def test_add_str_deprecated(self):
43
store = self.get_store('a')
44
self.callDeprecated(['Passing a string to Store.add'
45
' was deprecated in version 0.11.'],
46
store.add, 'foo', '1')
47
self.assertEqual('foo', store.get('1').read())
49
40
def fill_store(self, store):
50
41
store.add(StringIO('hello'), 'a')
51
42
store.add(StringIO('other'), 'b')
58
49
store_a = self.get_store('a')
59
store_a.add(StringIO('foo'), '1')
50
store_a.add('foo', '1')
61
52
store_b = self.get_store('b')
62
store_b.copy_all_ids(store_a)
53
copy_all(store_a, store_b)
63
54
self.assertEqual(store_a.get('1').read(), 'foo')
64
55
self.assertEqual(store_b.get('1').read(), 'foo')
65
56
# TODO: Switch the exception form UnlistableStore to
89
80
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
91
def get_store(self, path=u'.'):
92
t = transport.get_transport(path)
93
return TextStore(t, compressed=True)
82
def get_store(self, path='.'):
83
t = LocalTransport(path)
84
return CompressedTextStore(t)
95
86
def test_total_size(self):
96
store = self.get_store(u'.')
87
store = self.get_store('.')
97
88
store.register_suffix('dsc')
98
89
store.add(StringIO('goodbye'), '123123')
99
90
store.add(StringIO('goodbye2'), '123123', 'dsc')
101
92
self.assertEqual(store.total_size(), (2, 55))
103
94
def test__relpath_suffixed(self):
104
my_store = TextStore(MockTransport(),
105
prefixed=True, compressed=True)
95
my_store = CompressedTextStore(MockTransport(), True)
106
96
my_store.register_suffix('dsc')
107
self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
97
self.assertEqual('45/foo.dsc.gz', my_store._relpath('foo', ['dsc']))
110
100
class TestMemoryStore(TestCase):
112
102
def get_store(self):
113
return TextStore(MemoryTransport())
103
return store.ImmutableMemoryStore()
105
def test_imports(self):
106
from bzrlib.store import ImmutableMemoryStore
115
108
def test_add_and_retrieve(self):
116
109
store = self.get_store()
117
110
store.add(StringIO('hello'), 'aa')
146
139
class TestTextStore(TestCaseInTempDir, TestStores):
148
def get_store(self, path=u'.'):
149
t = transport.get_transport(path)
150
return TextStore(t, compressed=False)
141
def get_store(self, path='.'):
142
t = LocalTransport(path)
152
145
def test_total_size(self):
153
146
store = self.get_store()
161
154
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
164
class TestMixedTextStore(TestCaseInTempDir, TestStores):
166
def get_store(self, path=u'.', compressed=True):
167
t = transport.get_transport(path)
168
return TextStore(t, compressed=compressed)
170
def test_get_mixed(self):
171
cs = self.get_store(u'.', compressed=True)
172
s = self.get_store(u'.', compressed=False)
173
cs.add(StringIO('hello there'), 'a')
175
self.failUnlessExists('a.gz')
176
self.failIf(os.path.lexists('a'))
178
self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
180
self.assertEquals(cs.has_id('a'), True)
181
self.assertEquals(s.has_id('a'), True)
182
self.assertEquals(cs.get('a').read(), 'hello there')
183
self.assertEquals(s.get('a').read(), 'hello there')
185
self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
187
s.add(StringIO('goodbye'), 'b')
188
self.failUnlessExists('b')
189
self.failIf(os.path.lexists('b.gz'))
190
self.assertEquals(open('b').read(), 'goodbye')
192
self.assertEquals(cs.has_id('b'), True)
193
self.assertEquals(s.has_id('b'), True)
194
self.assertEquals(cs.get('b').read(), 'goodbye')
195
self.assertEquals(s.get('b').read(), 'goodbye')
197
self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
199
157
class MockTransport(transport.Transport):
200
158
"""A fake transport for testing with."""
274
232
def test__relpath_simple_suffixed(self):
275
233
my_store = store.TransportStore(MockTransport())
234
my_store.register_suffix('gz')
276
235
my_store.register_suffix('bar')
277
my_store.register_suffix('baz')
278
self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
279
self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
236
self.assertEqual('foo.gz', my_store._relpath('foo', ['gz']))
237
self.assertEqual('foo.gz.bar', my_store._relpath('foo', ['gz', 'bar']))
281
239
def test__relpath_prefixed_suffixed(self):
282
240
my_store = store.TransportStore(MockTransport(), True)
241
my_store.register_suffix('gz')
283
242
my_store.register_suffix('bar')
284
my_store.register_suffix('baz')
285
self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
286
self.assertEqual('45/foo.bar.baz',
287
my_store._relpath('foo', ['bar', 'baz']))
243
self.assertEqual('45/foo.gz', my_store._relpath('foo', ['gz']))
244
self.assertEqual('45/foo.gz.bar',
245
my_store._relpath('foo', ['gz', 'bar']))
289
247
def test_add_simple(self):
290
248
stream = StringIO("content")
312
270
my_store.add(stream, "foo", 'dsc')
313
271
self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
315
def get_populated_store(self, prefixed=False,
316
store_class=TextStore, compressed=False):
317
my_store = store_class(MemoryTransport(), prefixed,
318
compressed=compressed)
273
def get_populated_store(self, prefixed=False, store_class=TextStore):
274
my_store = store_class(MemoryTransport(), prefixed)
319
275
my_store.register_suffix('sig')
320
276
stream = StringIO("signature")
321
277
my_store.add(stream, "foo", 'sig')
364
320
my_store.get('missing', 'sig').read())
366
322
def test___iter__no_suffix(self):
367
my_store = TextStore(MemoryTransport(),
368
prefixed=False, compressed=False)
323
my_store = TextStore(MemoryTransport(), False)
369
324
stream = StringIO("content")
370
325
my_store.add(stream, "foo")
371
326
self.assertEqual(set(['foo']),
380
335
def test___iter__compressed(self):
381
336
self.assertEqual(set(['foo']),
382
337
set(self.get_populated_store(
383
compressed=True).__iter__()))
338
store_class=CompressedTextStore).__iter__()))
384
339
self.assertEqual(set(['foo']),
385
340
set(self.get_populated_store(
386
True, compressed=True).__iter__()))
341
True, CompressedTextStore).__iter__()))
388
343
def test___len__(self):
389
344
self.assertEqual(1, len(self.get_populated_store()))
391
346
def test_copy_suffixes(self):
392
347
from_store = self.get_populated_store()
393
to_store = TextStore(MemoryTransport(),
394
prefixed=True, compressed=True)
348
to_store = CompressedTextStore(MemoryTransport(), True)
395
349
to_store.register_suffix('sig')
396
to_store.copy_all_ids(from_store)
350
copy_all(from_store, to_store)
397
351
self.assertEqual(1, len(to_store))
398
352
self.assertEqual(set(['foo']), set(to_store.__iter__()))
399
353
self.assertEqual('content', to_store.get('foo').read())
403
357
def test_relpath_escaped(self):
404
358
my_store = store.TransportStore(MemoryTransport())
405
359
self.assertEqual('%25', my_store._relpath('%'))
407
def test_escaped_uppercase(self):
408
"""Uppercase letters are escaped for safety on Windows"""
409
my_store = store.TransportStore(MemoryTransport(), escaped=True)
410
# a particularly perverse file-id! :-)
411
self.assertEquals(my_store._escape_file_id('C:<>'), '%43%3a%3c%3e')
414
class TestVersionFileStore(TestCaseWithTransport):
417
super(TestVersionFileStore, self).setUp()
418
self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
420
def test_get_weave_registers_dirty_in_write(self):
421
transaction = transactions.WriteTransaction()
422
vf = self.vfstore.get_weave_or_empty('id', transaction)
424
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
425
transaction = transactions.WriteTransaction()
426
vf = self.vfstore.get_weave('id', transaction)
428
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
430
def test_get_weave_or_empty_readonly_fails(self):
431
transaction = transactions.ReadOnlyTransaction()
432
vf = self.assertRaises(errors.ReadOnlyError,
433
self.vfstore.get_weave_or_empty,
437
def test_get_weave_readonly_cant_write(self):
438
transaction = transactions.WriteTransaction()
439
vf = self.vfstore.get_weave_or_empty('id', transaction)
441
transaction = transactions.ReadOnlyTransaction()
442
vf = self.vfstore.get_weave_or_empty('id', transaction)
443
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])