~abentley/bzrtools/bzrtools.dev

« back to all changes in this revision

Viewing changes to fai/pylon/util.py

  • Committer: Robert Collins
  • Date: 2005-09-13 15:11:39 UTC
  • mto: (147.2.6) (364.1.3 bzrtools)
  • mto: This revision was merged to the branch mainline in revision 324.
  • Revision ID: robertc@robertcollins.net-20050913151139-9ac920fc9d7bda31
TODOification

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import os
 
2
import tempfile
 
3
import shutil
 
4
import string
 
5
import pybaz.util
 
6
import errors
 
7
 
 
8
__docformat__ = "restructuredtext"
 
9
__doc__ = "General Utility functions"
 
10
 
 
11
def linktree(src, dest, top_exists = False):
 
12
    """Produces a hard-linked clone of the source.
 
13
 
 
14
    :param src: The directory to clone
 
15
    :type src: str
 
16
    :param dest: The name of the new directory to create
 
17
    :type dest: str
 
18
    """
 
19
    if not top_exists:
 
20
        os.mkdir(dest)
 
21
    shutil.copymode(src, dest)
 
22
    for my_file in os.listdir(src):
 
23
        srcpath = "%s/%s" % (src, my_file)
 
24
        # avoid trouble if dest is inside src
 
25
        if os.path.samefile(srcpath, dest):
 
26
            continue
 
27
        destpath = "%s/%s" % (dest, my_file)
 
28
        if os.path.isdir(srcpath) and not os.path.islink(srcpath):
 
29
            linktree(srcpath, destpath)
 
30
        else:
 
31
            os.link(srcpath, destpath)
 
32
 
 
33
 
 
34
class NewFileVersion:
 
35
    def __init__(self, final_filename):
 
36
        self.final_filename = final_filename
 
37
        (directory, suffix) = os.path.split(final_filename)
 
38
        (self.fd, self.temp_filename) = tempfile.mkstemp(dir=directory)
 
39
        self.file = os.fdopen(self.fd, "w")
 
40
 
 
41
    def write(self, str):
 
42
        self.file.write(str)
 
43
 
 
44
    def commit(self):
 
45
        os.chmod(self.temp_filename, os.stat(self.final_filename).st_mode)
 
46
        os.rename(self.temp_filename, self.final_filename)
 
47
        self.file.close()
 
48
 
 
49
 
 
50
def regex_escape(str):
 
51
    newstr = str[0:]
 
52
    special = "\{}()+?$^.*[]|"
 
53
    for char in special:
 
54
        newstr = string.replace(newstr, char, "\\"+char)
 
55
    return newstr
 
56
 
 
57
def _escape_helper(str, rep):
 
58
    return string.replace(str, rep, "\\"+rep)
 
59
 
 
60
 
 
61
def safe_unlink(my_file):
 
62
    if my_file is not None:
 
63
        os.unlink(my_file)
 
64
 
 
65
 
 
66
def tmpdir(parent=None):
 
67
    """
 
68
    Creates a temporary directory, and returns its name.
 
69
 
 
70
    :return: the directory name
 
71
    :rtype: string
 
72
    """
 
73
    return tempfile.mkdtemp("", ",,fai-", parent)
 
74
 
 
75
class iter_delete_wrapper:
 
76
    def __init__(self, iter, dir):
 
77
        self.iter = iter
 
78
        self.dir = dir
 
79
 
 
80
    def __iter__(self):
 
81
        return self.iter
 
82
 
 
83
    def __del__(self):
 
84
        if self.dir is not None:
 
85
            shutil.rmtree(self.dir)
 
86
 
 
87
def iter_pairs(iterator):
 
88
    """Returns a seqence of values as a series of pairs of even/odd values
 
89
    
 
90
    :param iter: The iterator to reinterpret as a sequence of pairs
 
91
    :type: iter
 
92
    :return: iterator of even/odd values
 
93
    :rtype: iter of (val, val)
 
94
    """
 
95
    iterator = iterator.__iter__()
 
96
    while True:
 
97
        key = iterator.next()
 
98
        value = iterator.next()
 
99
        yield (key, value)
 
100
 
 
101
def shortest(vals):
 
102
    """Given a series of values, generates the shortest one.
 
103
 
 
104
    :param vals: List of values to examine
 
105
    :type vals: list of str
 
106
    :return: the shortest values, or None for empty sequences. 
 
107
    :rtype: str or NoneType
 
108
    """
 
109
    maximum = None
 
110
    for value in vals:
 
111
        if maximum is None or len(value) < len(maximum):
 
112
            maximum = value
 
113
    return maximum
 
114
 
 
115
def difference_index(atext, btext):
 
116
    """Find the indext of the first character that differs betweeen two texts
 
117
 
 
118
    :param atext: The first text
 
119
    :type atext: str
 
120
    :param btext: The second text
 
121
    :type str: str
 
122
    :return: The index, or None if there are no differences within the range
 
123
    :rtype: int or NoneType
 
124
    """
 
125
    length = len(atext)
 
126
    if len(btext) < length:
 
127
        length = len(btext)
 
128
    for i in range(length):
 
129
        if atext[i] != btext[i]:
 
130
            return i;
 
131
    return None
 
132
 
 
133
 
 
134
def iter_untar(file, output_dir, compression = "gzip"):
 
135
    """Untars a file, with incremental status output.
 
136
 
 
137
    :param file: The file to untar
 
138
    :type file: str
 
139
    :param output_dir: The directory to output to
 
140
    :type output_dir: str
 
141
    :param compression: The compression method: "gzip", "bzip2" or None
 
142
    :type: str or NoneType
 
143
    :return: an iterator of the list of untarred files 
 
144
    :rtype: iter of str
 
145
    """
 
146
    args = ["-xvf", file, "-C", output_dir]
 
147
    if compression is not None:
 
148
        if (compression != "gzip" and compression != "bzip2"):
 
149
            raise errors.UnknownCompressionMethod(compression)
 
150
        args.append("--"+compression)
 
151
    tar_iter = pybaz.util.exec_safe_iter_stdout("tar", args)
 
152
    for line in tar_iter:
 
153
        yield line.rstrip("\n")
 
154
 
 
155
 
 
156
def untar_parent(file, output_dir, compression = "gzip"):
 
157
    """Untars a file, returns the top-level directory in the tar
 
158
 
 
159
    :param file: The file to untar
 
160
    :type file: str
 
161
    :param output_dir: The directory to output to
 
162
    :type output_dir: str
 
163
    :param compression: The compression method: "gzip", "bzip2" or None
 
164
    :type: str or NoneType
 
165
    :return: The top-level directory of the tar, or None if ambigious 
 
166
    :rtype: str or NoneType
 
167
    """
 
168
    untar_iter = iter_untar(file, output_dir, compression)
 
169
    try:
 
170
        parent_dir = untar_iter.next()
 
171
    except StopIteration:
 
172
        raise errors.NoUnambiguousParent(file, parent_dir)
 
173
    for file in untar_iter:
 
174
        if not file.startswith(parent_dir):
 
175
            raise errors.NoUnambiguousParent(file, parent_dir)
 
176
    return parent_dir
 
177
 
 
178
 
 
179
def cmp_func(function):
 
180
    """Returns a lambda that uses the output of a function as a cmp key
 
181
 
 
182
    :param function: A function to produce the key value
 
183
    :return: A lambda that uses the output of the function for comparison
 
184
    """
 
185
    return lambda x, y: cmp(function(x), function(y))
 
186
 
 
187
 
 
188
 
 
189
# arch-tag: 0cf2f861-4c7e-4d88-a0dd-378c3241564a