~abentley/bzrtools/bzrtools.dev

16 by abentley
Got is_clean under test, added setters/getters for pull data
1
import bzrlib
19 by abentley
librified most of the pull script
2
import os
16 by abentley
Got is_clean under test, added setters/getters for pull data
3
import os.path
19 by abentley
librified most of the pull script
4
import sys
16 by abentley
Got is_clean under test, added setters/getters for pull data
5
import tempfile
6
import shutil
7
8
def temp_branch():
9
    dirname = tempfile.mkdtemp("temp-branch")
10
    return bzrlib.Branch(dirname, init=True)
11
12
def rm_branch(br):
13
    shutil.rmtree(br.base)
14
15
def is_clean(cur_branch):
16
    """
17
    Return true if no files are modifed or unknown
18
    >>> br = temp_branch()
19
    >>> is_clean(br)
20
    True
21
    >>> fooname = os.path.join(br.base, "foo")
22
    >>> file(fooname, "wb").write("bar")
23
    >>> is_clean(br)
24
    False
25
    >>> bzrlib.add.smart_add([fooname])
26
    >>> is_clean(br)
27
    False
28
    >>> br.commit("added file")
29
    >>> is_clean(br)
30
    True
31
    >>> rm_branch(br)
32
    """
33
    old_tree = cur_branch.basis_tree()
34
    new_tree = cur_branch.working_tree()
35
    for file_state, fid, old_name, new_name, kind in \
36
        bzrlib.diff_trees(old_tree, new_tree):
37
        if file_state not in ('I', '.'):
38
            return False
39
    return True
40
41
def set_pull_data(br, location, rev_id):
19 by abentley
librified most of the pull script
42
    pull_file = file (br.controlfilename("x-pull-data"), "wb")
16 by abentley
Got is_clean under test, added setters/getters for pull data
43
    pull_file.write("%s\n%s\n" % (location, rev_id))
44
45
def get_pull_data(br):
46
    """
47
    >>> br = temp_branch()
48
    >>> get_pull_data(br)
49
    (None, None)
50
    >>> set_pull_data(br, 'http://somewhere', '888-777')
51
    >>> get_pull_data(br)
52
    ('http://somewhere', '888-777')
53
    >>> rm_branch(br)
54
    """
19 by abentley
librified most of the pull script
55
    filename = br.controlfilename("x-pull-data")
16 by abentley
Got is_clean under test, added setters/getters for pull data
56
    if not os.path.exists(filename):
57
        return (None, None)
58
    pull_file = file (filename, "rb")
59
    location, rev_id = [f.rstrip('\n') for f in pull_file]
60
    return location, rev_id
61
20 by abentley
added bzr-push command
62
def set_push_data(br, location):
63
    push_file = file (br.controlfilename("x-push-data"), "wb")
64
    push_file.write("%s\n" % location)
65
66
def get_push_data(br):
67
    """
68
    >>> br = temp_branch()
69
    >>> get_push_data(br) is None
70
    True
71
    >>> set_push_data(br, 'http://somewhere')
72
    >>> get_push_data(br)
73
    'http://somewhere'
74
    >>> rm_branch(br)
75
    """
76
    filename = br.controlfilename("x-push-data")
77
    if not os.path.exists(filename):
78
        return None
79
    push_file = file (filename, "rb")
80
    (location,) = [f.rstrip('\n') for f in push_file]
81
    return location
82
19 by abentley
librified most of the pull script
83
"""
84
>>> shell_escape('hello')
85
'\h\e\l\l\o'
86
"""
87
def shell_escape(arg):
88
    return "".join(['\\'+c for c in arg])
89
90
def safe_system(args):
91
    """
92
    >>> real_system = os.system
93
    >>> os.system = sys.stdout.write
94
    >>> safe_system(['a', 'b', 'cd'])
95
    \\a \\b \\c\\d
96
    >>> os.system = real_system
97
    """
98
    arg_str = " ".join([shell_escape(a) for a in args])
99
    return os.system(arg_str)
100
20 by abentley
added bzr-push command
101
def rsync(source, target, ssh=False):
19 by abentley
librified most of the pull script
102
    """
103
    >>> real_system = os.system
104
    >>> os.system = sys.stdout.write
105
    >>> rsync("a", "b")
106
    \\r\\s\\y\\n\\c \\-\\a\\v \\-\\-\\d\\e\\l\\e\\t\\e \\a \\b
107
    >>> os.system = real_system
108
    """
20 by abentley
added bzr-push command
109
    cmd = ["rsync", "-av", "--delete"]
110
    if ssh:
111
        cmd.extend(('-e', 'ssh'))
112
    cmd.extend((source, target))
113
    safe_system(cmd)
19 by abentley
librified most of the pull script
114
115
def pull(cur_branch, location=None):
116
    pull_location, pull_revision = get_pull_data(cur_branch)
117
    if pull_location is not None:
118
        if cur_branch.last_patch() != pull_revision:
119
            print "Aborting: This branch has had commits, so pull would lose data."
120
            sys.exit(1)
121
    if len(sys.argv) > 1:
122
        pull_location = sys.argv[1] 
123
        if not pull_location.endswith('/'):
124
            pull_location.append('/')
125
126
    if pull_location is None:
127
        print "No pull location saved.  Please specify one on the command line."
128
        sys.exit(1)
129
130
    if not is_clean(cur_branch):
131
        print "Error: This tree has uncommitted changes or unknown (?) files."
132
        sys.exit(1)
133
134
    print "Synchronizing with %s" % pull_location
135
    rsync (pull_location, cur_branch.base)
136
137
    set_pull_data(cur_branch, pull_location, cur_branch.last_patch())
138
20 by abentley
added bzr-push command
139
140
def push(cur_branch, location=None):
141
    push_location = get_push_data(cur_branch)
142
    if location is not None:
143
        push_location = location
144
    
145
    if push_location is None:
146
        print "No push location saved.  Please specify one on the command line."
147
        sys.exit(1)
148
149
    if not is_clean(cur_branch):
150
        print "Error: This tree has uncommitted changes or unknown (?) files."
151
        sys.exit(1)
152
153
    print "Pushing to %s" % push_location
154
    rsync (cur_branch.base, push_location, ssh=True)
155
156
    set_push_data(cur_branch, push_location)
157
19 by abentley
librified most of the pull script
158
def run_tests():
16 by abentley
Got is_clean under test, added setters/getters for pull data
159
    import doctest
160
    bzrlib.trace.create_tracefile([])
18 by abentley
Finished implementing bzr-pull
161
    result = doctest.testmod()
19 by abentley
librified most of the pull script
162
    if result[1] > 0:
163
        if result[0] == 0:
164
            print "All tests passed"
165
    else:
166
        print "No tests to run"
167
if __name__ == "__main__":
168
    run_tests()