1
# Copyright (C) 2006-2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for weave repositories.
19
For interface tests see tests/per_repository/*.py.
23
from cStringIO import StringIO
24
from stat import S_ISDIR
27
from bzrlib.bzrdir import (
30
from bzrlib.errors import (
34
from bzrlib.repository import (
38
from bzrlib.serializer import (
39
format_registry as serializer_format_registry,
41
from bzrlib.tests import (
43
TestCaseWithTransport,
46
from bzrlib.plugins.weave_fmt import xml4
47
from bzrlib.plugins.weave_fmt.bzrdir import (
50
from bzrlib.plugins.weave_fmt.repository import (
59
class TestFormat6(TestCaseWithTransport):
61
def test_attribute__fetch_order(self):
62
"""Weaves need topological data insertion."""
63
control = BzrDirFormat6().initialize(self.get_url())
64
repo = RepositoryFormat6().initialize(control)
65
self.assertEqual('topological', repo._format._fetch_order)
67
def test_attribute__fetch_uses_deltas(self):
68
"""Weaves do not reuse deltas."""
69
control = BzrDirFormat6().initialize(self.get_url())
70
repo = RepositoryFormat6().initialize(control)
71
self.assertEqual(False, repo._format._fetch_uses_deltas)
73
def test_attribute__fetch_reconcile(self):
74
"""Weave repositories need a reconcile after fetch."""
75
control = BzrDirFormat6().initialize(self.get_url())
76
repo = RepositoryFormat6().initialize(control)
77
self.assertEqual(True, repo._format._fetch_reconcile)
79
def test_no_ancestry_weave(self):
80
control = BzrDirFormat6().initialize(self.get_url())
81
repo = RepositoryFormat6().initialize(control)
82
# We no longer need to create the ancestry.weave file
83
# since it is *never* used.
84
self.assertRaises(NoSuchFile,
85
control.transport.get,
88
def test_supports_external_lookups(self):
89
control = BzrDirFormat6().initialize(self.get_url())
90
repo = RepositoryFormat6().initialize(control)
91
self.assertFalse(repo._format.supports_external_lookups)
96
class TestFormat7(TestCaseWithTransport):
98
def test_attribute__fetch_order(self):
99
"""Weaves need topological data insertion."""
100
control = BzrDirMetaFormat1().initialize(self.get_url())
101
repo = RepositoryFormat7().initialize(control)
102
self.assertEqual('topological', repo._format._fetch_order)
104
def test_attribute__fetch_uses_deltas(self):
105
"""Weaves do not reuse deltas."""
106
control = BzrDirMetaFormat1().initialize(self.get_url())
107
repo = RepositoryFormat7().initialize(control)
108
self.assertEqual(False, repo._format._fetch_uses_deltas)
110
def test_attribute__fetch_reconcile(self):
111
"""Weave repositories need a reconcile after fetch."""
112
control = BzrDirMetaFormat1().initialize(self.get_url())
113
repo = RepositoryFormat7().initialize(control)
114
self.assertEqual(True, repo._format._fetch_reconcile)
116
def test_disk_layout(self):
117
control = BzrDirMetaFormat1().initialize(self.get_url())
118
repo = RepositoryFormat7().initialize(control)
119
# in case of side effects of locking.
123
# format 'Bazaar-NG Repository format 7'
125
# inventory.weave == empty_weave
126
# empty revision-store directory
127
# empty weaves directory
128
t = control.get_repository_transport(None)
129
self.assertEqualDiff('Bazaar-NG Repository format 7',
130
t.get('format').read())
131
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
132
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
133
self.assertEqualDiff('# bzr weave file v5\n'
136
t.get('inventory.weave').read())
137
# Creating a file with id Foo:Bar results in a non-escaped file name on
139
control.create_branch()
140
tree = control.create_workingtree()
141
tree.add(['foo'], ['Foo:Bar'], ['file'])
142
tree.put_file_bytes_non_atomic('Foo:Bar', 'content\n')
144
tree.commit('first post', rev_id='first')
146
if sys.platform != 'win32':
148
self.knownFailure('Foo:Bar cannot be used as a file-id on windows'
151
self.assertEqualDiff(
152
'# bzr weave file v5\n'
154
'1 7fe70820e08a1aac0ef224d9c66ab66831cc4ab1\n'
162
t.get('weaves/74/Foo%3ABar.weave').read())
164
def test_shared_disk_layout(self):
165
control = BzrDirMetaFormat1().initialize(self.get_url())
166
repo = RepositoryFormat7().initialize(control, shared=True)
168
# format 'Bazaar-NG Repository format 7'
169
# inventory.weave == empty_weave
170
# empty revision-store directory
171
# empty weaves directory
172
# a 'shared-storage' marker file.
173
# lock is not present when unlocked
174
t = control.get_repository_transport(None)
175
self.assertEqualDiff('Bazaar-NG Repository format 7',
176
t.get('format').read())
177
self.assertEqualDiff('', t.get('shared-storage').read())
178
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
179
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
180
self.assertEqualDiff('# bzr weave file v5\n'
183
t.get('inventory.weave').read())
184
self.assertFalse(t.has('branch-lock'))
186
def test_creates_lockdir(self):
187
"""Make sure it appears to be controlled by a LockDir existence"""
188
control = BzrDirMetaFormat1().initialize(self.get_url())
189
repo = RepositoryFormat7().initialize(control, shared=True)
190
t = control.get_repository_transport(None)
191
# TODO: Should check there is a 'lock' toplevel directory,
192
# regardless of contents
193
self.assertFalse(t.has('lock/held/info'))
196
self.assertTrue(t.has('lock/held/info'))
198
# unlock so we don't get a warning about failing to do so
201
def test_uses_lockdir(self):
202
"""repo format 7 actually locks on lockdir"""
203
base_url = self.get_url()
204
control = BzrDirMetaFormat1().initialize(base_url)
205
repo = RepositoryFormat7().initialize(control, shared=True)
206
t = control.get_repository_transport(None)
210
# make sure the same lock is created by opening it
211
repo = Repository.open(base_url)
213
self.assertTrue(t.has('lock/held/info'))
215
self.assertFalse(t.has('lock/held/info'))
217
def test_shared_no_tree_disk_layout(self):
218
control = BzrDirMetaFormat1().initialize(self.get_url())
219
repo = RepositoryFormat7().initialize(control, shared=True)
220
repo.set_make_working_trees(False)
222
# format 'Bazaar-NG Repository format 7'
224
# inventory.weave == empty_weave
225
# empty revision-store directory
226
# empty weaves directory
227
# a 'shared-storage' marker file.
228
t = control.get_repository_transport(None)
229
self.assertEqualDiff('Bazaar-NG Repository format 7',
230
t.get('format').read())
231
## self.assertEqualDiff('', t.get('lock').read())
232
self.assertEqualDiff('', t.get('shared-storage').read())
233
self.assertEqualDiff('', t.get('no-working-trees').read())
234
repo.set_make_working_trees(True)
235
self.assertFalse(t.has('no-working-trees'))
236
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
237
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
238
self.assertEqualDiff('# bzr weave file v5\n'
241
t.get('inventory.weave').read())
243
def test_supports_external_lookups(self):
244
control = BzrDirMetaFormat1().initialize(self.get_url())
245
repo = RepositoryFormat7().initialize(control)
246
self.assertFalse(repo._format.supports_external_lookups)
249
class TestInterWeaveRepo(TestCaseWithTransport):
251
def test_is_compatible_and_registered(self):
252
# InterWeaveRepo is compatible when either side
253
# is a format 5/6/7 branch
254
from bzrlib.repofmt import knitrepo
255
formats = [RepositoryFormat5(),
258
incompatible_formats = [RepositoryFormat4(),
259
knitrepo.RepositoryFormatKnit1(),
261
repo_a = self.make_repository('a')
262
repo_b = self.make_repository('b')
263
is_compatible = InterWeaveRepo.is_compatible
264
for source in incompatible_formats:
265
# force incompatible left then right
266
repo_a._format = source
267
repo_b._format = formats[0]
268
self.assertFalse(is_compatible(repo_a, repo_b))
269
self.assertFalse(is_compatible(repo_b, repo_a))
270
for source in formats:
271
repo_a._format = source
272
for target in formats:
273
repo_b._format = target
274
self.assertTrue(is_compatible(repo_a, repo_b))
275
self.assertEqual(InterWeaveRepo,
276
InterRepository.get(repo_a, repo_b).__class__)
279
_working_inventory_v4 = """<inventory file_id="TREE_ROOT">
280
<entry file_id="bar-20050901064931-73b4b1138abc9cd2" kind="file" name="bar" parent_id="TREE_ROOT" />
281
<entry file_id="foo-20050801201819-4139aa4a272f4250" kind="directory" name="foo" parent_id="TREE_ROOT" />
282
<entry file_id="bar-20050824000535-6bc48cfad47ed134" kind="file" name="bar" parent_id="foo-20050801201819-4139aa4a272f4250" />
286
_revision_v4 = """<revision committer="Martin Pool <mbp@sourcefrog.net>"
287
inventory_id="mbp@sourcefrog.net-20050905080035-e0439293f8b6b9f9"
288
inventory_sha1="e79c31c1deb64c163cf660fdedd476dd579ffd41"
289
revision_id="mbp@sourcefrog.net-20050905080035-e0439293f8b6b9f9"
290
timestamp="1125907235.212"
292
<message>- start splitting code for xml (de)serialization away from objects
293
preparatory to supporting multiple formats by a single library
296
<revision_ref revision_id="mbp@sourcefrog.net-20050905063503-43948f59fa127d92" revision_sha1="7bdf4cc8c5bdac739f8cf9b10b78cf4b68f915ff" />
302
class TestSerializer(TestCase):
303
"""Test serializer"""
305
def test_registry(self):
306
self.assertIs(xml4.serializer_v4,
307
serializer_format_registry.get('4'))
309
def test_canned_inventory(self):
310
"""Test unpacked a canned inventory v4 file."""
311
inp = StringIO(_working_inventory_v4)
312
inv = xml4.serializer_v4.read_inventory(inp)
313
self.assertEqual(len(inv), 4)
314
self.assert_('bar-20050901064931-73b4b1138abc9cd2' in inv)
316
def test_unpack_revision(self):
317
"""Test unpacking a canned revision v4"""
318
inp = StringIO(_revision_v4)
319
rev = xml4.serializer_v4.read_revision(inp)
320
eq = self.assertEqual
322
"Martin Pool <mbp@sourcefrog.net>")
324
"mbp@sourcefrog.net-20050905080035-e0439293f8b6b9f9")
325
eq(len(rev.parent_ids), 1)
326
eq(rev.parent_ids[0],
327
"mbp@sourcefrog.net-20050905063503-43948f59fa127d92")