1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
from bzrlib.bzrdir import BzrDir
from bzrlib.errors import NotBranchError, NoSuchFile, BzrError
from bzrlib.rio import RioReader, rio_file, Stanza
def branch_mark(mark, location, delete=False):
if location is None:
location = '.'
bzrdir = BzrDir.open_containing(location)[0]
try:
branch = bzrdir.open_branch()
repository = branch.repository
except NotBranchError:
branch = None
repository = bzrdir.open_repository()
if delete is True:
if mark is None:
raise BzrError("Please specify a mark to delete")
unset_mark(branch, mark)
elif mark is None:
print_marks(repository)
else:
set_mark(branch, mark)
def print_marks(repository):
marks = get_marks(repository)
for mark in sorted(marks.keys()):
branches = marks[mark]
assert len(branches) != 0
print mark
for branch in branches:
print " %s" % branch
MARK_FORMAT_1 = "Branch Marks Format 1"
def get_rio_stanzas(resource, filename, missing_to_empty=True):
try:
riofile = resource.get(filename)
except NoSuchFile:
if not missing_to_empty:
raise
return None, []
header = riofile.next().rstrip('\n')
return header, list(RioReader(riofile))
def get_marks(repository):
header, stanzas = get_rio_stanzas(repository.control_files, 'branch-marks')
if len(stanzas) == 0:
return {}
if header != MARK_FORMAT_1:
raise BzrError("Unknown mark format: %s" % header)
marks = {}
for stanza in stanzas:
mark = stanza['mark']
if mark not in marks:
marks[mark] = []
marks[mark].append(stanza['branch'])
marks[mark].sort()
return marks
def relative_base(branch):
return branch.repository.bzrdir.transport.clone('..').relpath(branch.base)
def set_mark(branch, mark):
def add(marks):
if mark not in marks:
marks[mark] = []
marks[mark].append(relative_base(branch))
return marks
return _set_mark(branch, mark, add)
def unset_mark(branch, mark):
def remove(marks):
try:
marks[mark].remove(relative_base(branch))
return marks
except KeyError, ValueError:
raise BzrError("Branch does not have this mark set.")
return _set_mark(branch, mark, remove)
def _set_mark(branch, mark, mutate):
branch.repository.lock_write()
try:
marks = get_marks(branch.repository)
marks = mutate(marks)
set_marks(branch.repository, marks)
finally:
branch.repository.unlock()
def set_marks(repository, marks):
riofile = rio_file(mark_stanzas(marks), MARK_FORMAT_1)
repository.control_files.put('branch-marks', riofile)
def mark_stanzas(marks):
for mark,branches in marks.iteritems():
for branch in branches:
yield Stanza(mark=mark, branch=branch)
|