~abentley/bzrtools/bzrtools.dev

360.1.3 by Aaron Bentley
Add experimental branch-mark command
1
from bzrlib.bzrdir import BzrDir
2
from bzrlib.errors import NotBranchError, NoSuchFile, BzrError
3
from bzrlib.rio import RioReader, rio_file, Stanza
4
5
def branch_mark(mark, location, delete=False):
6
    if location is None:
7
        location = '.'
8
    bzrdir = BzrDir.open_containing(location)[0]
9
    try:
10
        branch = bzrdir.open_branch()
11
        repository = branch.repository
12
    except NotBranchError:
13
        branch = None
14
        repository = bzrdir.open_repository()
15
    if delete is True:
16
        if mark is None:
17
            raise BzrError("Please specify a mark to delete")
18
        unset_mark(branch, mark)
360.1.4 by Aaron Bentley
Fix mark deletion
19
    elif mark is None:
360.1.3 by Aaron Bentley
Add experimental branch-mark command
20
        print_marks(repository)
21
    else:
22
        set_mark(branch, mark)
23
24
def print_marks(repository):
25
    marks = get_marks(repository)
26
    for mark in sorted(marks.keys()):
27
        branches = marks[mark]
28
        assert len(branches) != 0
29
        print mark
30
        for branch in branches:
31
            print "  %s" % branch
32
33
34
MARK_FORMAT_1 = "Branch Marks Format 1"
35
36
def get_rio_stanzas(resource, filename, missing_to_empty=True):
37
    try:
38
        riofile = resource.get(filename)
39
    except NoSuchFile:
40
        if not missing_to_empty:
41
            raise
42
        return None, []
43
    header = riofile.next().rstrip('\n')
44
    return header, list(RioReader(riofile))
45
46
47
def get_marks(repository):
48
    header, stanzas = get_rio_stanzas(repository.control_files, 'branch-marks')
49
    if len(stanzas) == 0:
50
        return {}
51
    if header != MARK_FORMAT_1:
52
        raise BzrError("Unknown mark format: %s" % header)
53
    marks = {}
54
    for stanza in stanzas:
55
        mark = stanza['mark']
56
        if mark not in marks:
57
            marks[mark] = []
58
        marks[mark].append(stanza['branch'])
59
        marks[mark].sort()
60
    return marks
61
62
def relative_base(branch):
63
    return branch.repository.bzrdir.transport.clone('..').relpath(branch.base)
64
65
def set_mark(branch, mark):
66
    def add(marks):
67
        if mark not in marks:
68
            marks[mark] = []
69
        marks[mark].append(relative_base(branch))
360.1.4 by Aaron Bentley
Fix mark deletion
70
        return marks
360.1.3 by Aaron Bentley
Add experimental branch-mark command
71
    return _set_mark(branch, mark, add)
72
73
def unset_mark(branch, mark):
74
    def remove(marks):
75
        try:
76
            marks[mark].remove(relative_base(branch))
360.1.4 by Aaron Bentley
Fix mark deletion
77
            return marks
360.1.3 by Aaron Bentley
Add experimental branch-mark command
78
        except KeyError, ValueError:
79
            raise BzrError("Branch does not have this mark set.")
80
    return _set_mark(branch, mark, remove)
81
82
def _set_mark(branch, mark, mutate):
83
    branch.repository.lock_write()
84
    try:
85
        marks = get_marks(branch.repository)
360.1.4 by Aaron Bentley
Fix mark deletion
86
        marks = mutate(marks)
360.1.3 by Aaron Bentley
Add experimental branch-mark command
87
        set_marks(branch.repository, marks)
88
    finally:
89
        branch.repository.unlock()
90
91
def set_marks(repository, marks):
92
    riofile = rio_file(mark_stanzas(marks), MARK_FORMAT_1)
93
    repository.control_files.put('branch-marks', riofile)
94
95
def mark_stanzas(marks):
96
    for mark,branches in marks.iteritems():
97
        for branch in branches:
98
            yield Stanza(mark=mark, branch=branch)