~abentley/bzrtools/bzrtools.dev

« back to all changes in this revision

Viewing changes to upstream_import.py

  • Committer: Aaron Bentley
  • Date: 2006-12-12 22:58:12 UTC
  • Revision ID: abentley@panoramicfeedback.com-20061212225812-9xwer0h2uqjz2lxb
Generalize tests for zip

Show diffs side-by-side

added added

removed removed

Lines of Context:
3
3
from bz2 import BZ2File
4
4
import errno
5
5
import os
 
6
from shutil import rmtree
6
7
from StringIO import StringIO
7
 
import stat
8
8
import tarfile
 
9
from unittest import makeSuite
9
10
import zipfile
10
11
 
11
 
from bzrlib import generate_ids
12
12
from bzrlib.bzrdir import BzrDir
13
13
from bzrlib.errors import NoSuchFile, BzrCommandError, NotBranchError
14
 
from bzrlib.osutils import (pathjoin, isdir, file_iterator, basename,
15
 
                            file_kind)
 
14
from bzrlib.osutils import pathjoin, isdir, file_iterator
 
15
from bzrlib.tests import TestCaseInTempDir
16
16
from bzrlib.trace import warning
17
17
from bzrlib.transform import TreeTransform, resolve_conflicts, cook_conflicts
18
18
from bzrlib.workingtree import WorkingTree
19
19
 
20
20
class ZipFileWrapper(object):
21
21
 
22
 
    def __init__(self, fileobj, mode):
23
 
        self.zipfile = zipfile.ZipFile(fileobj, mode)
 
22
    def __init__(self, zipfile):
 
23
        self.zipfile = zipfile
24
24
 
25
25
    def getmembers(self):
26
26
        for info in self.zipfile.infolist():
35
35
        else:
36
36
            self.zipfile.write(filename)
37
37
 
 
38
 
38
39
    def close(self):
39
40
        self.zipfile.close()
40
41
 
46
47
        self.type = None
47
48
        self.name = info.filename
48
49
        self.zipfile = zipfile
49
 
        self.mode = 0666
50
50
 
51
51
    def isdir(self):
52
52
        # Really? Eeeew!
57
57
        return not self.isdir()
58
58
 
59
59
 
60
 
class DirWrapper(object):
61
 
    def __init__(self, fileobj, mode='r'):
62
 
        assert mode == 'r', mode
63
 
        self.root = os.path.realpath(fileobj.read())
64
 
 
65
 
    def __repr__(self):
66
 
        return 'DirWrapper(%r)' % self.root
67
 
 
68
 
    def getmembers(self, subdir=None):
69
 
        if subdir is not None:
70
 
            mydir = pathjoin(self.root, subdir)
71
 
        else:
72
 
            mydir = self.root
73
 
        for child in os.listdir(mydir):
74
 
            if subdir is not None:
75
 
                child = pathjoin(subdir, child)
76
 
            fi = FileInfo(self.root, child)
77
 
            yield fi
78
 
            if fi.isdir():
79
 
                for v in self.getmembers(child):
80
 
                    yield v
81
 
 
82
 
    def extractfile(self, member):
83
 
        return open(member.fullpath)
84
 
 
85
 
 
86
 
class FileInfo(object):
87
 
 
88
 
    def __init__(self, root, filepath):
89
 
        self.fullpath = pathjoin(root, filepath)
90
 
        self.root = root
91
 
        if filepath != '':
92
 
            self.name = pathjoin(basename(root), filepath)
93
 
        else:
94
 
            print 'root %r' % root
95
 
            self.name = basename(root)
96
 
        self.type = None
97
 
        stat = os.lstat(self.fullpath)
98
 
        self.mode = stat.st_mode
99
 
        if self.isdir():
100
 
            self.name += '/'
101
 
 
102
 
    def __repr__(self):
103
 
        return 'FileInfo(%r)' % self.name
104
 
 
105
 
    def isreg(self):
106
 
        return stat.S_ISREG(self.mode)
107
 
 
108
 
    def isdir(self):
109
 
        return stat.S_ISDIR(self.mode)
110
 
 
111
 
        
112
60
def top_directory(path):
113
61
    """Return the top directory given in a path."""
114
62
    dirname = os.path.dirname(path)
125
73
    possible_prefix = None
126
74
    for name in names:
127
75
        name_top = top_directory(name)
128
 
        if name_top == '':
129
 
            return None
130
76
        if possible_prefix is None:
131
77
            possible_prefix = name_top
132
78
        else:
165
111
    import_archive(tree, tar_file)
166
112
 
167
113
def import_zip(tree, zip_input):
168
 
    zip_file = ZipFileWrapper(zip_input, 'r')
 
114
    zip_file = ZipFileWrapper(zipfile.ZipFile(zip_input))
169
115
    import_archive(tree, zip_file)
170
116
 
171
 
def import_dir(tree, dir_input):
172
 
    dir_file = DirWrapper(dir_input)
173
 
    import_archive(tree, dir_file)
174
 
 
175
117
def import_archive(tree, archive_file):
176
118
    prefix = common_directory(names_of_files(archive_file))
177
119
    tt = TreeTransform(tree)
194
136
        relative_path = member.name 
195
137
        if prefix is not None:
196
138
            relative_path = relative_path[len(prefix)+1:]
197
 
            relative_path = relative_path.rstrip('/')
198
139
        if relative_path == '':
199
140
            continue
200
141
        add_implied_parents(implied_parents, relative_path)
202
143
        added.add(relative_path.rstrip('/'))
203
144
        path = tree.abspath(relative_path)
204
145
        if member.name in seen:
205
 
            if tt.final_kind(trans_id) == 'file':
206
 
                tt.set_executability(None, trans_id)
207
146
            tt.cancel_creation(trans_id)
208
147
        seen.add(member.name)
209
148
        if member.isreg():
210
149
            tt.create_file(file_iterator(archive_file.extractfile(member)), 
211
150
                           trans_id)
212
 
            executable = (member.mode & 0111) != 0
213
 
            tt.set_executability(executable, trans_id)
214
151
        elif member.isdir():
215
152
            do_directory(tt, trans_id, tree, relative_path, path)
216
153
        elif member.issym():
217
154
            tt.create_symlink(member.linkname, trans_id)
218
 
        else:
219
 
            continue
220
 
        if tt.tree_file_id(trans_id) is None:
221
 
            name = basename(member.name.rstrip('/'))
222
 
            file_id = generate_ids.gen_file_id(name)
223
 
            tt.version_file(file_id, trans_id)
224
155
 
225
156
    for relative_path in implied_parents.difference(added):
226
157
        if relative_path == "":
228
159
        trans_id = tt.trans_id_tree_path(relative_path)
229
160
        path = tree.abspath(relative_path)
230
161
        do_directory(tt, trans_id, tree, relative_path, path)
231
 
        if tt.tree_file_id(trans_id) is None:
232
 
            tt.version_file(trans_id, trans_id)
233
162
        added.add(relative_path)
234
163
 
235
 
    for path in removed.difference(added):
236
 
        tt.unversion_file(tt.trans_id_tree_path(path))
237
 
 
238
164
    for conflict in cook_conflicts(resolve_conflicts(tt), tt):
239
165
        warning(conflict)
240
166
    tt.apply()
 
167
    update_ids(tree, added, removed)
 
168
 
 
169
 
 
170
def update_ids(tree, added, removed):
 
171
    """Make sure that all present files files have file_ids.
 
172
    """
 
173
    # XXX detect renames
 
174
    new = added.difference(removed)
 
175
    deleted = removed.difference(added)
 
176
    tree.add(sorted(new))
 
177
    tree.remove(sorted(deleted, reverse=True))
241
178
 
242
179
 
243
180
def do_import(source, tree_directory=None):
274
211
                tar_input.close()
275
212
        elif source.endswith('.zip'):
276
213
            import_zip(tree, open(source, 'rb'))
277
 
        elif file_kind(source) == 'directory':
278
 
            s = StringIO(source)
279
 
            s.seek(0)
280
 
            import_dir(tree, s)
281
214
        else:
282
215
            raise BzrCommandError('Unhandled import source')
283
216
    finally:
284
217
        tree.unlock()
 
218
 
 
219
class TestImport(TestCaseInTempDir):
 
220
 
 
221
    def make_tar(self, mode='w'):
 
222
        def maker(fileobj):
 
223
            return tarfile.open('project-0.1.tar', mode, fileobj)
 
224
        return self.make_archive(maker)
 
225
 
 
226
    def make_archive(self, maker):
 
227
        result = StringIO()
 
228
        archive_file = maker(result)
 
229
        os.mkdir('project-0.1')
 
230
        archive_file.add('project-0.1')
 
231
        os.mkdir('project-0.1/junk')
 
232
        archive_file.add('project-0.1/junk')
 
233
        
 
234
        f = file('project-0.1/README', 'wb')
 
235
        f.write('What?')
 
236
        f.close()
 
237
        archive_file.add('project-0.1/README')
 
238
 
 
239
        f = file('project-0.1/FEEDME', 'wb')
 
240
        f.write('Hungry!!')
 
241
        f.close()
 
242
        archive_file.add('project-0.1/FEEDME')
 
243
 
 
244
        archive_file.close()
 
245
        rmtree('project-0.1')
 
246
        result.seek(0)
 
247
        return result
 
248
 
 
249
    def make_tar2(self):
 
250
        result = StringIO()
 
251
        tar_file = tarfile.open('project-0.2.tar', 'w', result)
 
252
        os.mkdir('project-0.2')
 
253
        tar_file.add('project-0.2')
 
254
        
 
255
        os.mkdir('project-0.2/junk')
 
256
        tar_file.add('project-0.2/junk')
 
257
 
 
258
        f = file('project-0.2/README', 'wb')
 
259
        f.write('Now?')
 
260
        f.close()
 
261
        tar_file.add('project-0.2/README')
 
262
        tar_file.close()
 
263
 
 
264
        tar_file = tarfile.open('project-0.2.tar', 'a', result)
 
265
        tar_file.add('project-0.2/README')
 
266
 
 
267
        rmtree('project-0.2')
 
268
        return result
 
269
 
 
270
    def make_messed_tar(self):
 
271
        result = StringIO()
 
272
        tar_file = tarfile.open('project-0.1.tar', 'w', result)
 
273
        os.mkdir('project-0.1')
 
274
        tar_file.add('project-0.1')
 
275
 
 
276
        os.mkdir('project-0.2')
 
277
        tar_file.add('project-0.2')
 
278
        
 
279
        f = file('project-0.1/README', 'wb')
 
280
        f.write('What?')
 
281
        f.close()
 
282
        tar_file.add('project-0.1/README')
 
283
        tar_file.close()
 
284
        rmtree('project-0.1')
 
285
        result.seek(0)
 
286
        return result
 
287
 
 
288
    def make_zip(self):
 
289
        def maker(fileobj):
 
290
            return ZipFileWrapper(zipfile.ZipFile(fileobj, 'w'))
 
291
        return self.make_archive(maker)
 
292
 
 
293
    def test_top_directory(self):
 
294
        self.assertEqual(top_directory('ab/b/c'), 'ab')
 
295
        self.assertEqual(top_directory('/etc'), '/')
 
296
 
 
297
    def test_common_directory(self):
 
298
        self.assertEqual(common_directory(['ab/c/d', 'ab/c/e']), 'ab')
 
299
        self.assertIs(common_directory(['ab/c/d', 'ac/c/e']), None)
 
300
 
 
301
    def test_untar(self):
 
302
        tar_file = self.make_tar()
 
303
        tree = BzrDir.create_standalone_workingtree('tree')
 
304
        import_tar(tree, tar_file)
 
305
        self.assertTrue(tree.path2id('README') is not None) 
 
306
        self.assertTrue(tree.path2id('FEEDME') is not None)
 
307
        self.assertTrue(os.path.isfile(tree.abspath('README')))
 
308
        self.assertEqual(tree.inventory[tree.path2id('README')].kind, 'file')
 
309
        self.assertEqual(tree.inventory[tree.path2id('FEEDME')].kind, 'file')
 
310
        
 
311
        f = file(tree.abspath('junk/food'), 'wb')
 
312
        f.write('I like food\n')
 
313
        f.close()
 
314
 
 
315
        tar_file = self.make_tar2()
 
316
        import_tar(tree, tar_file)
 
317
        self.assertTrue(tree.path2id('README') is not None) 
 
318
        self.assertTrue(not os.path.exists(tree.abspath('FEEDME')))
 
319
 
 
320
 
 
321
    def test_untar2(self):
 
322
        tar_file = self.make_messed_tar()
 
323
        tree = BzrDir.create_standalone_workingtree('tree')
 
324
        import_tar(tree, tar_file)
 
325
        self.assertTrue(tree.path2id('project-0.1/README') is not None) 
 
326
 
 
327
    def test_untar_gzip(self):
 
328
        tar_file = self.make_tar(mode='w:gz')
 
329
        tree = BzrDir.create_standalone_workingtree('tree')
 
330
        import_tar(tree, tar_file)
 
331
        self.assertTrue(tree.path2id('README') is not None) 
 
332
 
 
333
    def test_unzip(self):
 
334
        zip_file = self.make_zip()
 
335
        tree = BzrDir.create_standalone_workingtree('tree')
 
336
        import_zip(tree, zip_file)
 
337
        self.assertTrue(tree.path2id('README') is not None) 
 
338
        self.assertTrue(tree.path2id('FEEDME') is not None)
 
339
        self.assertTrue(os.path.isfile(tree.abspath('README')))
 
340
        self.assertEqual(tree.inventory[tree.path2id('README')].kind, 'file')
 
341
        self.assertEqual(tree.inventory[tree.path2id('FEEDME')].kind, 'file')
 
342
        
 
343
        f = file(tree.abspath('junk/food'), 'wb')
 
344
        f.write('I like food\n')
 
345
        f.close()
 
346
 
 
347
 
 
348
def test_suite():
 
349
    return makeSuite(TestImport)