1
# Copyright (C) 2006 by Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Test that caching works correctly."""
19
from bzrlib.disk_backed_cache import DiskBackedCache
20
from bzrlib.tests import TestCase
23
class TestDiskBackedCache(TestCase):
24
"""Tests for the disk-backed cache that don't use the disk."""
26
def get_cache(self, **kwargs):
27
return DiskBackedCache(**kwargs)
29
def test_like_dict(self):
30
"""Test that the cache acts like a dict object."""
31
cache = self.get_cache()
34
self.assertEqual(1, len(cache))
35
self.assertEqual('bar', cache['foo'])
36
self.assertEqual('bar', cache.get('foo'))
38
self.assertRaises(KeyError, cache.__getitem__, 'baz')
39
self.assertEqual(None, cache.get('baz'))
40
self.assertEqual('newval', cache.get('baz', 'newval'))
42
self.failUnless('foo' in cache)
43
self.failUnless(cache.has_key('foo'))
44
self.failIf('baz' in cache)
45
self.failIf(cache.has_key('baz'))
47
cache['tempkey'] = 'xxyyzz'
48
self.assertEqual(2, len(cache))
49
self.failUnless('tempkey' in cache)
51
self.assertEqual(['foo', 'tempkey'], sorted(cache.keys()))
52
self.assertEqual(['foo', 'tempkey'], sorted(cache.iterkeys()))
54
self.assertEqual([('foo', 'bar'), ('tempkey', 'xxyyzz')],
55
sorted(cache.iteritems()))
56
self.assertEqual(['bar', 'xxyyzz'], sorted(cache.itervalues()))
58
# Make sure values() is a list, not an iterator
59
values = cache.values()
60
self.assertEqual(2, len(values))
61
self.assertEqual(['bar', 'xxyyzz'], sorted(values))
64
self.failIf('tempkey' in cache)
65
self.assertEqual(['foo'], sorted(cache.keys()))
67
key, val = cache.popitem()
68
self.assertEqual('foo', key)
69
self.assertEqual('bar', val)
70
self.assertEqual([], cache.keys())
72
self.assertRaises(KeyError, cache.popitem)
73
self.assertEqual(0, len(cache))
75
def test_only_store_string(self):
76
"""DiskBackedCache can only store string objects."""
77
cache = self.get_cache()
79
self.assertRaises(TypeError, cache.__setitem__, 'foo', u'bar')
80
self.assertRaises(TypeError, cache.__setitem__, 'foo', 1)
81
self.assertRaises(TypeError, cache.__setitem__, 'foo', 1.0)
82
self.assertRaises(TypeError, cache.__setitem__, 'foo', object())
83
self.assertRaises(TypeError, cache.__setitem__, 'foo', object)
85
def test_funky_key(self):
86
"""We suppport any Dict supported key type"""
87
cache = self.get_cache()
90
cache[u'm\xb5'] = 'mu'
92
cache[('x', 1)] = 'x1'
94
self.failUnless('str' in cache)
95
self.failUnless(u'm\xb5' in cache)
96
self.failUnless(1 in cache)
97
self.failUnless(('x', 1) in cache)
99
def test_tracks_size(self):
100
cache = self.get_cache()
102
self.assertEqual(0, cache.cache_size)
104
self.assertEqual(3, cache.cache_size)
105
cache['baz'] = 'jiggly'
106
self.assertEqual(9, cache.cache_size)
109
self.assertEqual(6, cache.cache_size)
112
self.assertEqual(3, cache.cache_size)
115
self.assertEqual(0, cache.cache_size)
116
self.assertEqual({}, cache._dict)
118
def test_no_disk_stops_caching(self):
119
cache = self.get_cache(max_size=10, use_disk=False)
123
cache['bar'] = 'toomuch'
124
self.assertEqual(['baz', 'foo'], sorted(cache.keys()))
126
def test_disallow_replace(self):
127
cache = self.get_cache(allow_replace=False)
129
self.assertRaises(KeyError, cache.__setitem__, 'foo', 'baz')
131
def test_overflow_to_disk(self):
132
cache = self.get_cache(max_size=10)
134
cache['bar'] = '1234567890'
135
self.assertEqual(10, cache.cache_size)
136
self.assertEqual(None, cache._disk_cache)
138
# This should spill to disk
139
cache['baz'] = 'foobar'
140
self.assertNotEqual(None, cache._disk_cache)
142
self.assertEqual('foobar', cache['baz'])
144
cache._disk_cache.seek(0)
145
self.assertEqual('foobar', cache._disk_cache.read())
147
def test_flush_all(self):
148
"""All entries are written to disk on overflow if flush_all is set."""
149
cache = self.get_cache(max_size=10, flush_all=True)
151
cache['bar'] = '1234567890'
152
self.assertEqual(10, cache.cache_size)
153
self.assertEqual(None, cache._disk_cache)
155
# This should spill to disk
156
cache['baz'] = 'foobar'
157
self.assertNotEqual(None, cache._disk_cache)
159
# The entries should still be accessible, but
160
# the should all be on disk.
161
self.assertEqual('foobar', cache['baz'])
162
self.assertEqual('1234567890', cache['bar'])
164
# The order on disk doesn't matter, but existing items
165
# should be written before the new item, and since
166
# we only have 1 existing item, the order is fixed
167
cache._disk_cache.seek(0)
168
self.assertEqual('1234567890foobar', cache._disk_cache.read())