1
# Copyright (C) 2006-2007, 2010-2011 Aaron Bentley <aaron@aaronbentley.com>
2
# Copyright (C) 2013 Aaron Bentley <aaron@aaronbentley.com>
3
# Copyright (C) 2007 Charlie Shepherd <masterdriverz@gentoo.org>
4
# Copyright (C) 2011 Canonical Ltd.
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
# GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
1
20
from shutil import rmtree
24
revision as _mod_revision,
3
26
from bzrlib.branch import Branch
4
from bzrlib.errors import NoWorkingTree, NotLocalUrl, NotBranchError
27
from bzrlib.errors import BzrCommandError, NoWorkingTree, NotBranchError
28
from bzrlib import registry
5
29
from bzrlib.workingtree import WorkingTree
7
from errors import (NotCheckout, UncommittedCheckout, ParentMissingRevisions,
31
from errors import (NotCheckout, UncommittedCheckout, ParentMissingRevisions,
11
def zap(path, remove_branch=False):
33
from bzrlib.plugins.bzrtools.bzrtools import read_locked
36
class AllowChanged(object):
39
def check_changed(klass, wt, remove_branch):
43
class CheckChanged(object):
46
def check_changed(klass, wt, remove_branch):
47
delta = wt.changes_from(wt.basis_tree(), want_unchanged=False)
48
if delta.has_changed():
49
klass.handle_changed(wt, remove_branch)
52
class HaltOnChange(CheckChanged):
55
def handle_changed(wt, remove_branch):
56
raise UncommittedCheckout()
59
class StoreChanges(CheckChanged):
62
def handle_changed(wt, remove_branch):
63
from bzrlib.plugins.pipeline.pipeline import PipeManager
65
raise BzrCommandError('Cannot store changes in deleted branch.')
66
PipeManager.from_checkout(wt).store_uncommitted()
69
change_policy_registry = registry.Registry()
72
change_policy_registry.register('force', AllowChanged,
73
'Delete tree even if contents are modified.')
76
change_policy_registry.register('store', StoreChanges,
77
'Store changes in branch. (Requires'
81
change_policy_registry.register('check', HaltOnChange,
82
'Stop if tree contents are modified.')
85
change_policy_registry.default_key = 'check'
89
def zap(path, remove_branch=False, policy=HaltOnChange):
13
wt = WorkingTree.open(path)
91
wt = bzrdir.BzrDir.open(path).open_workingtree(path,
92
recommend_upgrade=False)
14
93
except (NoWorkingTree, NotBranchError):
15
94
raise NotCheckout(path)
16
95
tree_base = wt.bzrdir.transport.base
18
97
branch_base = branch.bzrdir.transport.base
19
98
if tree_base == branch_base:
20
99
raise NotCheckout(path)
21
delta = wt.changes_from(wt.basis_tree(), want_unchanged=False)
22
if delta.has_changed():
23
raise UncommittedCheckout()
100
policy.check_changed(wt, remove_branch)
25
102
parent_loc = branch.get_parent()
26
103
if parent_loc is None:
28
parent = Branch.open(parent_loc)
29
p_ancestry = parent.repository.get_ancestry(parent.last_revision())
30
if branch.last_revision() not in p_ancestry:
31
raise ParentMissingRevisions(branch.get_parent())
105
with read_locked(Branch.open(parent_loc)) as parent:
106
last_revision = _mod_revision.ensure_null(parent.last_revision())
107
if last_revision != _mod_revision.NULL_REVISION:
108
graph = parent.repository.get_graph()
109
heads = graph.heads([last_revision, branch.last_revision()])
110
if branch.last_revision() in heads:
111
raise ParentMissingRevisions(branch.get_parent())
34
114
t = branch.bzrdir.transport
43
123
from unittest import makeSuite
45
from bzrlib.bzrdir import BzrDir, BzrDirMetaFormat1
46
from bzrlib.branch import BranchReferenceFormat
47
from bzrlib.tests import TestCaseInTempDir
49
class TestZap(TestCaseInTempDir):
125
from bzrlib.tests import TestCaseWithTransport
128
class PipelinePluginFeature:
133
import bzrlib.plugins.pipeline
140
class TestZap(TestCaseWithTransport):
51
142
def make_checkout(self):
52
wt = BzrDir.create_standalone_workingtree('source')
54
checkout = BzrDirMetaFormat1().initialize('checkout')
55
BranchReferenceFormat().initialize(checkout, wt.branch)
56
return checkout.create_workingtree()
143
wt = bzrdir.BzrDir.create_standalone_workingtree('source')
144
return wt.branch.create_checkout('checkout', lightweight=True)
58
146
def make_checkout2(self):
59
147
wt = self.make_checkout()
60
148
wt2 = wt.branch.bzrdir.sprout('source2').open_workingtree()
62
checkout = BzrDirMetaFormat1().initialize('checkout2')
63
BranchReferenceFormat().initialize(checkout, wt2.branch)
64
return checkout.create_workingtree()
149
return wt2.branch.create_checkout('checkout2', lightweight=True)
66
151
def test_is_checkout(self):
67
152
self.assertRaises(NotCheckout, zap, '.')
68
wt = BzrDir.create_standalone_workingtree('.')
153
wt = bzrdir.BzrDir.create_standalone_workingtree('.')
69
154
self.assertRaises(NotCheckout, zap, '.')
71
156
def test_zap_works(self):
94
179
checkout.commit('commit changes to branch')
182
def make_modified_checkout(self):
183
checkout = self.make_checkout()
184
os.mkdir('checkout/foo')
188
def test_allow_modified(self):
189
self.make_modified_checkout()
190
self.assertRaises(UncommittedCheckout, zap, 'checkout')
191
zap('checkout', policy=AllowChanged)
193
def test_store(self):
194
self.requireFeature(PipelinePluginFeature)
195
checkout = self.make_modified_checkout()
196
zap('checkout', policy=StoreChanges)
197
self.assertIn('stored-transform',
198
checkout.branch.bzrdir.transport.list_dir('branch'))
200
def test_store_remove_branch(self):
201
self.requireFeature(PipelinePluginFeature)
202
checkout = self.make_modified_checkout()
203
branch = self.make_branch('branch')
204
checkout.branch.set_parent(branch.base)
205
e = self.assertRaises(BzrCommandError, zap, 'checkout',
206
policy=StoreChanges, remove_branch=True)
207
self.assertEqual('Cannot store changes in deleted branch.', str(e))
209
def test_store_remove_branch_unmodified(self):
210
self.requireFeature(PipelinePluginFeature)
211
checkout = self.make_checkout()
212
branch = self.make_branch('branch')
213
checkout.branch.set_parent(branch.base)
214
zap('checkout', policy=StoreChanges, remove_branch=True)
216
def test_zap_branch_with_unique_revision(self):
217
parent = self.make_branch_and_tree('parent')
219
new_branch = self.make_branch('new')
220
new_branch.set_parent(parent.branch.base)
221
checkout = new_branch.create_checkout('checkout', lightweight=True)
222
checkout.commit('unique')
223
self.assertRaises(ParentMissingRevisions, zap, 'checkout',
97
226
return makeSuite(TestZap)