1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
1
# Copyright (C) 2006-2009, 2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
22
import bzrlib.bzrdir as bzrdir
23
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
24
22
import bzrlib.errors as errors
26
24
from bzrlib.inventory import Inventory
27
import bzrlib.repofmt.weaverepo as weaverepo
28
import bzrlib.repository as repository
29
from bzrlib.revision import NULL_REVISION, Revision
25
from bzrlib.revision import NULL_REVISION
30
26
from bzrlib.tests import (
32
TestCaseWithTransport,
30
from bzrlib.tests.matchers import MatchesAncestry
36
31
from bzrlib.tests.per_interrepository import (
37
32
TestCaseWithInterRepository,
41
def check_old_format_lock_error(repository_format):
42
"""Potentially ignore LockError on old formats.
44
On win32, with the old OS locks, we get a failure of double-lock when
45
we open a object in 2 objects and try to lock both.
47
On new formats, LockError would be invalid, but for old formats
48
this was not supported on Win32.
50
if sys.platform != 'win32':
53
description = repository_format.get_format_description()
54
if description in ("Repository format 4",
55
"Weave repository format 5",
56
"Weave repository format 6"):
58
# win32 OS locks are not re-entrant. So one process cannot
59
# open the same repository twice and lock them both.
60
raise TestSkipped('%s on win32 cannot open the same'
61
' repository twice in different objects'
66
36
def check_repo_format_for_funky_id_on_win32(repo):
67
if (isinstance(repo, (weaverepo.AllInOneRepository,
68
weaverepo.WeaveMetaDirRepository))
69
and sys.platform == 'win32'):
70
raise TestSkipped("funky chars does not permitted"
71
" on this platform in repository"
72
" %s" % repo.__class__.__name__)
37
if not repo._format.supports_funky_characters and sys.platform == 'win32':
38
raise TestSkipped("funky chars not allowed on this platform in repository"
39
" %s" % repo.__class__.__name__)
75
42
class TestInterRepository(TestCaseWithInterRepository):
111
78
tree_a.branch.repository.lock_write()
112
79
tree_a.branch.repository.start_write_group()
113
tree_a.branch.repository.sign_revision('rev2', bzrlib.gpg.LoopbackGPGStrategy(None))
80
tree_a.branch.repository.sign_revision('rev2',
81
bzrlib.gpg.LoopbackGPGStrategy(None))
114
82
tree_a.branch.repository.commit_write_group()
115
83
tree_a.branch.repository.unlock()
139
107
self.assertFalse(repo_b.has_revision('pizza'))
140
108
# Asking specifically for an absent revision errors.
141
109
self.assertRaises(errors.NoSuchRevision,
142
repo_b.search_missing_revision_ids, repo_a, revision_id='pizza',
110
repo_b.search_missing_revision_ids, repo_a, revision_ids=['pizza'],
143
111
find_ghosts=True)
144
112
self.assertRaises(errors.NoSuchRevision,
113
repo_b.search_missing_revision_ids, repo_a, revision_ids=['pizza'],
116
['search_missing_revision_ids(revision_id=...) was deprecated in '
117
'2.4. Use revision_ids=[...] instead.'],
118
self.assertRaises, errors.NoSuchRevision,
145
119
repo_b.search_missing_revision_ids, repo_a, revision_id='pizza',
146
120
find_ghosts=False)
151
125
# make a repository to compare against that is empty
152
126
repo_b = self.make_to_repository('empty')
153
127
repo_a = self.bzrdir.open_repository()
154
result = repo_b.search_missing_revision_ids(repo_a, revision_id='rev1')
128
result = repo_b.search_missing_revision_ids(
129
repo_a, revision_ids=['rev1'])
155
130
self.assertEqual(set(['rev1']), result.get_keys())
156
131
self.assertEqual(('search', set(['rev1']), set([NULL_REVISION]), 1),
157
132
result.get_recipe())
134
def test_search_missing_revision_ids_limit(self):
135
# The limit= argument makes fetch() limit
136
# the results to the first X topo-sorted revisions.
137
repo_b = self.make_to_repository('rev1_only')
138
repo_a = self.bzrdir.open_repository()
139
# check the test will be valid
140
self.assertFalse(repo_b.has_revision('rev2'))
141
result = repo_b.search_missing_revision_ids(repo_a, limit=1)
142
self.assertEqual(('search', set(['rev1']), set(['null:']), 1),
159
145
def test_fetch_fetches_signatures_too(self):
160
146
from_repo = self.bzrdir.open_repository()
161
147
from_signature = from_repo.get_signature_text('rev2')
215
201
rev = missing_ghost.get_revision('ghost')
216
202
inv = missing_ghost.get_inventory('ghost')
217
203
# rev must not be corrupt now
218
self.assertEqual([None, 'ghost', 'references', 'tip'],
219
missing_ghost.get_ancestry('tip'))
204
self.assertThat(['ghost', 'references', 'tip'],
205
MatchesAncestry(missing_ghost, 'tip'))