1
# Copyright (C) 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
from StringIO import StringIO
20
from bzrlib.branch import Branch
21
import bzrlib.errors as errors
22
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
23
from bzrlib.lockable_files import LockableFiles, TransportLock
24
from bzrlib.lockdir import LockDir
25
from bzrlib.tests import TestCaseInTempDir
26
from bzrlib.tests.test_transactions import DummyWeave
27
from bzrlib.transactions import (PassThroughTransaction,
31
from bzrlib.transport import get_transport
34
# these tests are applied in each parameterized suite for LockableFiles
35
class _TestLockableFiles_mixin(object):
37
def test_read_write(self):
38
self.assertRaises(NoSuchFile, self.lockable.get, 'foo')
39
self.assertRaises(NoSuchFile, self.lockable.get_utf8, 'foo')
40
self.lockable.lock_write()
42
unicode_string = u'bar\u1234'
43
self.assertEqual(4, len(unicode_string))
44
byte_string = unicode_string.encode('utf-8')
45
self.assertEqual(6, len(byte_string))
46
self.assertRaises(UnicodeEncodeError, self.lockable.put, 'foo',
47
StringIO(unicode_string))
48
self.lockable.put('foo', StringIO(byte_string))
49
self.assertEqual(byte_string,
50
self.lockable.get('foo').read())
51
self.assertEqual(unicode_string,
52
self.lockable.get_utf8('foo').read())
53
self.assertRaises(BzrBadParameterNotString,
54
self.lockable.put_utf8,
56
StringIO(unicode_string)
58
self.lockable.put_utf8('bar', unicode_string)
59
self.assertEqual(unicode_string,
60
self.lockable.get_utf8('bar').read())
61
self.assertEqual(byte_string,
62
self.lockable.get('bar').read())
63
self.lockable.put_bytes('raw', 'raw\xffbytes')
64
self.assertEqual('raw\xffbytes',
65
self.lockable.get('raw').read())
67
self.lockable.unlock()
70
self.lockable.lock_read()
72
self.assertRaises(ReadOnlyError, self.lockable.put, 'foo',
73
StringIO('bar\u1234'))
75
self.lockable.unlock()
77
def test_transactions(self):
78
self.assertIs(self.lockable.get_transaction().__class__,
79
PassThroughTransaction)
80
self.lockable.lock_read()
82
self.assertIs(self.lockable.get_transaction().__class__,
85
self.lockable.unlock()
86
self.assertIs(self.lockable.get_transaction().__class__,
87
PassThroughTransaction)
88
self.lockable.lock_write()
89
self.assertIs(self.lockable.get_transaction().__class__,
91
# check that finish is called:
93
self.lockable.get_transaction().register_dirty(vf)
94
self.lockable.unlock()
95
self.assertTrue(vf.finished)
97
def test__escape(self):
98
self.assertEqual('%25', self.lockable._escape('%'))
100
def test__escape_empty(self):
101
self.assertEqual('', self.lockable._escape(''))
103
def test_break_lock(self):
104
# some locks are not breakable
105
self.lockable.lock_write()
107
self.assertRaises(AssertionError, self.lockable.break_lock)
108
except NotImplementedError:
109
# this lock cannot be broken
110
self.lockable.unlock()
112
l2 = self.get_lockable()
113
orig_factory = bzrlib.ui.ui_factory
114
# silent ui - no need for stdout
115
bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
116
bzrlib.ui.ui_factory.stdin = StringIO("y\n")
120
bzrlib.ui.ui_factory = orig_factory
125
self.assertRaises(errors.LockBroken, self.lockable.unlock)
126
self.assertFalse(self.lockable.is_locked())
129
# This method of adapting tests to parameters is different to
130
# the TestProviderAdapters used elsewhere, but seems simpler for this
132
class TestLockableFiles_TransportLock(TestCaseInTempDir,
133
_TestLockableFiles_mixin):
136
super(TestLockableFiles_TransportLock, self).setUp()
137
transport = get_transport('.')
138
transport.mkdir('.bzr')
139
self.sub_transport = transport.clone('.bzr')
140
self.lockable = self.get_lockable()
141
self.lockable.create_lock()
144
super(TestLockableFiles_TransportLock, self).tearDown()
145
# free the subtransport so that we do not get a 5 second
146
# timeout due to the SFTP connection cache.
147
del self.sub_transport
149
def get_lockable(self):
150
return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
153
class TestLockableFiles_LockDir(TestCaseInTempDir,
154
_TestLockableFiles_mixin):
155
"""LockableFile tests run with LockDir underneath"""
158
super(TestLockableFiles_LockDir, self).setUp()
159
self.transport = get_transport('.')
160
self.lockable = self.get_lockable()
161
# the lock creation here sets mode - test_permissions on branch
162
# tests that implicitly, but it might be a good idea to factor
163
# out the mode checking logic and have it applied to loackable files
164
# directly. RBC 20060418
165
self.lockable.create_lock()
167
def get_lockable(self):
168
return LockableFiles(self.transport, 'my-lock', LockDir)
170
def test_lock_created(self):
171
self.assertTrue(self.transport.has('my-lock'))
172
self.lockable.lock_write()
173
self.assertTrue(self.transport.has('my-lock/held/info'))
174
self.lockable.unlock()
175
self.assertFalse(self.transport.has('my-lock/held/info'))
176
self.assertTrue(self.transport.has('my-lock'))
179
# TODO: Test the lockdir inherits the right file and directory permissions
180
# from the LockableFiles.