1
# Copyright (C) 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
from StringIO import StringIO
20
from bzrlib.branch import Branch
21
import bzrlib.errors as errors
22
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
23
from bzrlib.lockable_files import LockableFiles, TransportLock
24
from bzrlib.lockdir import LockDir
25
from bzrlib.tests import TestCaseInTempDir
26
from bzrlib.tests.test_transactions import DummyWeave
27
from bzrlib.transactions import (PassThroughTransaction,
31
from bzrlib.transport import get_transport
34
# these tests are applied in each parameterized suite for LockableFiles
35
class _TestLockableFiles_mixin(object):
37
def test_read_write(self):
38
self.assertRaises(NoSuchFile, self.lockable.get, 'foo')
39
self.assertRaises(NoSuchFile, self.lockable.get_utf8, 'foo')
40
self.lockable.lock_write()
42
unicode_string = u'bar\u1234'
43
self.assertEqual(4, len(unicode_string))
44
byte_string = unicode_string.encode('utf-8')
45
self.assertEqual(6, len(byte_string))
46
self.assertRaises(UnicodeEncodeError, self.lockable.put, 'foo',
47
StringIO(unicode_string))
48
self.lockable.put('foo', StringIO(byte_string))
49
self.assertEqual(byte_string,
50
self.lockable.get('foo').read())
51
self.assertEqual(unicode_string,
52
self.lockable.get_utf8('foo').read())
53
self.assertRaises(BzrBadParameterNotString,
54
self.lockable.put_utf8,
56
StringIO(unicode_string)
58
self.lockable.put_utf8('bar', unicode_string)
59
self.assertEqual(unicode_string,
60
self.lockable.get_utf8('bar').read())
61
self.assertEqual(byte_string,
62
self.lockable.get('bar').read())
64
self.lockable.unlock()
67
self.lockable.lock_read()
69
self.assertRaises(ReadOnlyError, self.lockable.put, 'foo',
70
StringIO('bar\u1234'))
72
self.lockable.unlock()
74
def test_transactions(self):
75
self.assertIs(self.lockable.get_transaction().__class__,
76
PassThroughTransaction)
77
self.lockable.lock_read()
79
self.assertIs(self.lockable.get_transaction().__class__,
82
self.lockable.unlock()
83
self.assertIs(self.lockable.get_transaction().__class__,
84
PassThroughTransaction)
85
self.lockable.lock_write()
86
self.assertIs(self.lockable.get_transaction().__class__,
88
# check that finish is called:
90
self.lockable.get_transaction().register_dirty(vf)
91
self.lockable.unlock()
92
self.assertTrue(vf.finished)
94
def test__escape(self):
95
self.assertEqual('%25', self.lockable._escape('%'))
97
def test__escape_empty(self):
98
self.assertEqual('', self.lockable._escape(''))
100
def test_break_lock(self):
101
# some locks are not breakable
102
self.lockable.lock_write()
104
self.assertRaises(AssertionError, self.lockable.break_lock)
105
except NotImplementedError:
106
# this lock cannot be broken
107
self.lockable.unlock()
109
l2 = self.get_lockable()
110
orig_factory = bzrlib.ui.ui_factory
111
# silent ui - no need for stdout
112
bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
113
bzrlib.ui.ui_factory.stdin = StringIO("y\n")
117
bzrlib.ui.ui_factory = orig_factory
122
self.assertRaises(errors.LockBroken, self.lockable.unlock)
123
self.assertFalse(self.lockable.is_locked())
126
# This method of adapting tests to parameters is different to
127
# the TestProviderAdapters used elsewhere, but seems simpler for this
129
class TestLockableFiles_TransportLock(TestCaseInTempDir,
130
_TestLockableFiles_mixin):
133
super(TestLockableFiles_TransportLock, self).setUp()
134
transport = get_transport('.')
135
transport.mkdir('.bzr')
136
self.sub_transport = transport.clone('.bzr')
137
self.lockable = self.get_lockable()
138
self.lockable.create_lock()
141
super(TestLockableFiles_TransportLock, self).tearDown()
142
# free the subtransport so that we do not get a 5 second
143
# timeout due to the SFTP connection cache.
144
del self.sub_transport
146
def get_lockable(self):
147
return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
150
class TestLockableFiles_LockDir(TestCaseInTempDir,
151
_TestLockableFiles_mixin):
152
"""LockableFile tests run with LockDir underneath"""
155
super(TestLockableFiles_LockDir, self).setUp()
156
self.transport = get_transport('.')
157
self.lockable = self.get_lockable()
158
# the lock creation here sets mode - test_permissions on branch
159
# tests that implicitly, but it might be a good idea to factor
160
# out the mode checking logic and have it applied to loackable files
161
# directly. RBC 20060418
162
self.lockable.create_lock()
164
def get_lockable(self):
165
return LockableFiles(self.transport, 'my-lock', LockDir)
167
def test_lock_created(self):
168
self.assertTrue(self.transport.has('my-lock'))
169
self.lockable.lock_write()
170
self.assertTrue(self.transport.has('my-lock/held/info'))
171
self.lockable.unlock()
172
self.assertFalse(self.transport.has('my-lock/held/info'))
173
self.assertTrue(self.transport.has('my-lock'))
176
# TODO: Test the lockdir inherits the right file and directory permissions
177
# from the LockableFiles.