~abentley/bzrtools/bzrtools.dev

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from bzrlib.bzrdir import BzrDir
from bzrlib.errors import NotBranchError, NoSuchFile, BzrError
from bzrlib.rio import RioReader, rio_file, Stanza

def branch_mark(mark, location, delete=False):
    if location is None:
        location = '.'
    bzrdir = BzrDir.open_containing(location)[0]
    try:
        branch = bzrdir.open_branch()
        repository = branch.repository
    except NotBranchError:
        branch = None
        repository = bzrdir.open_repository()
    if delete is True:
        if mark is None:
            raise BzrError("Please specify a mark to delete")
        unset_mark(branch, mark)
    elif mark is None:
        print_marks(repository)
    else:
        set_mark(branch, mark)

def print_marks(repository):
    marks = get_marks(repository)
    for mark in sorted(marks.keys()):
        branches = marks[mark]
        assert len(branches) != 0
        print mark
        for branch in branches:
            print "  %s" % branch


MARK_FORMAT_1 = "Branch Marks Format 1"

def get_rio_stanzas(resource, filename, missing_to_empty=True):
    try:
        riofile = resource.get(filename)
    except NoSuchFile:
        if not missing_to_empty:
            raise
        return None, []
    header = riofile.next().rstrip('\n')
    return header, list(RioReader(riofile))


def get_marks(repository):
    header, stanzas = get_rio_stanzas(repository.control_files, 'branch-marks')
    if len(stanzas) == 0:
        return {}
    if header != MARK_FORMAT_1:
        raise BzrError("Unknown mark format: %s" % header)
    marks = {}
    for stanza in stanzas:
        mark = stanza['mark']
        if mark not in marks:
            marks[mark] = []
        marks[mark].append(stanza['branch'])
        marks[mark].sort()
    return marks

def relative_base(branch):
    return branch.repository.bzrdir.transport.clone('..').relpath(branch.base)

def set_mark(branch, mark):
    def add(marks):
        if mark not in marks:
            marks[mark] = []
        marks[mark].append(relative_base(branch))
        return marks
    return _set_mark(branch, mark, add)

def unset_mark(branch, mark):
    def remove(marks):
        try:
            marks[mark].remove(relative_base(branch))
            return marks
        except KeyError, ValueError:
            raise BzrError("Branch does not have this mark set.")
    return _set_mark(branch, mark, remove)

def _set_mark(branch, mark, mutate):
    branch.repository.lock_write()
    try:
        marks = get_marks(branch.repository)
        marks = mutate(marks)
        set_marks(branch.repository, marks)
    finally:
        branch.repository.unlock()

def set_marks(repository, marks):
    riofile = rio_file(mark_stanzas(marks), MARK_FORMAT_1)
    repository.control_files.put('branch-marks', riofile)

def mark_stanzas(marks):
    for mark,branches in marks.iteritems():
        for branch in branches:
            yield Stanza(mark=mark, branch=branch)