1
# Copyright (C) 2005 Canonical Ltd
1
# Copyright (C) 2005 by Canonical Development Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23
import bzrlib.errors as errors
24
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
23
from bzrlib.errors import BzrError, UnlistableStore
24
from bzrlib.store import copy_all
25
25
from bzrlib.transport.local import LocalTransport
26
from bzrlib.transport import NoSuchFile
26
27
from bzrlib.store.text import TextStore
27
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
28
from bzrlib.selftest import TestCase, TestCaseInTempDir
28
29
import bzrlib.store as store
29
import bzrlib.transactions as transactions
30
30
import bzrlib.transport as transport
31
31
from bzrlib.transport.memory import MemoryTransport
38
38
f = store.get(fileid)
39
39
self.assertEqual(f.read(), value)
41
def test_add_str_deprecated(self):
43
store = self.get_store('a')
44
self.callDeprecated(['Passing a string to Store.add'
45
' was deprecated in version 0.11.'],
46
store.add, 'foo', '1')
47
self.assertEqual('foo', store.get('1').read())
49
41
def fill_store(self, store):
50
42
store.add(StringIO('hello'), 'a')
51
43
store.add(StringIO('other'), 'b')
58
50
store_a = self.get_store('a')
59
store_a.add(StringIO('foo'), '1')
51
store_a.add('foo', '1')
61
53
store_b = self.get_store('b')
62
store_b.copy_all_ids(store_a)
54
copy_all(store_a, store_b)
63
55
self.assertEqual(store_a.get('1').read(), 'foo')
64
56
self.assertEqual(store_b.get('1').read(), 'foo')
65
57
# TODO: Switch the exception form UnlistableStore to
89
81
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
91
def get_store(self, path=u'.'):
92
t = transport.get_transport(path)
83
def get_store(self, path='.'):
84
t = LocalTransport(path)
93
85
return TextStore(t, compressed=True)
95
87
def test_total_size(self):
96
store = self.get_store(u'.')
88
store = self.get_store('.')
97
89
store.register_suffix('dsc')
98
90
store.add(StringIO('goodbye'), '123123')
99
91
store.add(StringIO('goodbye2'), '123123', 'dsc')
101
93
self.assertEqual(store.total_size(), (2, 55))
103
95
def test__relpath_suffixed(self):
104
my_store = TextStore(MockTransport(),
105
prefixed=True, compressed=True)
96
my_store = TextStore(MockTransport(), True, compressed=True)
106
97
my_store.register_suffix('dsc')
107
98
self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
110
101
class TestMemoryStore(TestCase):
112
103
def get_store(self):
113
return TextStore(MemoryTransport())
104
return store.ImmutableMemoryStore()
106
def test_imports(self):
107
from bzrlib.store import ImmutableMemoryStore
115
109
def test_add_and_retrieve(self):
116
110
store = self.get_store()
117
111
store.add(StringIO('hello'), 'aa')
146
140
class TestTextStore(TestCaseInTempDir, TestStores):
148
def get_store(self, path=u'.'):
149
t = transport.get_transport(path)
142
def get_store(self, path='.'):
143
t = LocalTransport(path)
150
144
return TextStore(t, compressed=False)
152
146
def test_total_size(self):
164
158
class TestMixedTextStore(TestCaseInTempDir, TestStores):
166
def get_store(self, path=u'.', compressed=True):
167
t = transport.get_transport(path)
160
def get_store(self, path='.', compressed=True):
161
t = LocalTransport(path)
168
162
return TextStore(t, compressed=compressed)
170
164
def test_get_mixed(self):
171
cs = self.get_store(u'.', compressed=True)
172
s = self.get_store(u'.', compressed=False)
165
cs = self.get_store('.', compressed=True)
166
s = self.get_store('.', compressed=False)
173
167
cs.add(StringIO('hello there'), 'a')
175
169
self.failUnlessExists('a.gz')
364
358
my_store.get('missing', 'sig').read())
366
360
def test___iter__no_suffix(self):
367
my_store = TextStore(MemoryTransport(),
368
prefixed=False, compressed=False)
361
my_store = TextStore(MemoryTransport(), False, compressed=False)
369
362
stream = StringIO("content")
370
363
my_store.add(stream, "foo")
371
364
self.assertEqual(set(['foo']),
391
384
def test_copy_suffixes(self):
392
385
from_store = self.get_populated_store()
393
to_store = TextStore(MemoryTransport(),
394
prefixed=True, compressed=True)
386
to_store = TextStore(MemoryTransport(), True, compressed=True)
395
387
to_store.register_suffix('sig')
396
to_store.copy_all_ids(from_store)
388
copy_all(from_store, to_store)
397
389
self.assertEqual(1, len(to_store))
398
390
self.assertEqual(set(['foo']), set(to_store.__iter__()))
399
391
self.assertEqual('content', to_store.get('foo').read())
403
395
def test_relpath_escaped(self):
404
396
my_store = store.TransportStore(MemoryTransport())
405
397
self.assertEqual('%25', my_store._relpath('%'))
407
def test_escaped_uppercase(self):
408
"""Uppercase letters are escaped for safety on Windows"""
409
my_store = store.TransportStore(MemoryTransport(), escaped=True)
410
# a particularly perverse file-id! :-)
411
self.assertEquals(my_store._escape_file_id('C:<>'), '%43%3a%3c%3e')
414
class TestVersionFileStore(TestCaseWithTransport):
417
super(TestVersionFileStore, self).setUp()
418
self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
420
def test_get_weave_registers_dirty_in_write(self):
421
transaction = transactions.WriteTransaction()
422
vf = self.vfstore.get_weave_or_empty('id', transaction)
424
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
425
transaction = transactions.WriteTransaction()
426
vf = self.vfstore.get_weave('id', transaction)
428
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
430
def test_get_weave_or_empty_readonly_fails(self):
431
transaction = transactions.ReadOnlyTransaction()
432
vf = self.assertRaises(errors.ReadOnlyError,
433
self.vfstore.get_weave_or_empty,
437
def test_get_weave_readonly_cant_write(self):
438
transaction = transactions.WriteTransaction()
439
vf = self.vfstore.get_weave_or_empty('id', transaction)
441
transaction = transactions.ReadOnlyTransaction()
442
vf = self.vfstore.get_weave_or_empty('id', transaction)
443
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])