~abentley/bzrtools/bzrtools.dev

« back to all changes in this revision

Viewing changes to tests/upstream_import.py

  • Committer: Aaron Bentley
  • Date: 2011-06-27 22:03:53 UTC
  • mfrom: (770.1.1 trunk)
  • Revision ID: aaron@aaronbentley.com-20110627220353-c7ikthkaap2amfzm
Support importing .tar.xz and .tar.lzma files.  (Jelmer)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import os
 
2
from StringIO import StringIO
 
3
from shutil import rmtree, copy2, copytree
 
4
import tarfile
 
5
import tempfile
 
6
from unittest import makeSuite
 
7
 
 
8
from bzrlib import (
 
9
    osutils,
 
10
    revision as _mod_revision,
 
11
    transform
 
12
    )
 
13
from bzrlib.bzrdir import BzrDir
 
14
from bzrlib.export.tar_exporter import export_tarball
 
15
from bzrlib.plugins.bzrtools.upstream_import import (
 
16
    common_directory,
 
17
    import_archive,
 
18
    import_tar,
 
19
    import_zip,
 
20
    import_dir,
 
21
    top_path,
 
22
    ZipFileWrapper,
 
23
)
 
24
from bzrlib.tests import (
 
25
    TestCaseInTempDir,
 
26
    TestCaseWithTransport,
 
27
    UnicodeFilenameFeature,
 
28
    )
 
29
 
 
30
 
 
31
def import_tar_broken(tree, tar_input):
 
32
    """
 
33
    Import a tarfile with names that that end in //, e.g. Feisty Python 2.5
 
34
    """
 
35
    tar_file = tarfile.open('lala', 'r', tar_input)
 
36
    for member in tar_file.members:
 
37
        if member.name.endswith('/'):
 
38
            member.name += '/'
 
39
    import_archive(tree, tar_file)
 
40
 
 
41
 
 
42
class DirFileWriter(object):
 
43
 
 
44
    def __init__(self, fileobj, mode):
 
45
        # We may be asked to 'append'.  If so, fileobj already has a path.
 
46
        # So we copy the existing tree, and overwrite afterward.
 
47
        fileobj.seek(0)
 
48
        existing = fileobj.read()
 
49
        fileobj.seek(0)
 
50
        path = tempfile.mkdtemp(dir=os.getcwd())
 
51
        if existing != '':
 
52
            # copytree requires the directory not to exist
 
53
            os.rmdir(path)
 
54
            copytree(existing, path)
 
55
        fileobj.write(path)
 
56
        self.root = path
 
57
 
 
58
    def add(self, path):
 
59
        target_path = os.path.join(self.root, path)
 
60
        parent = osutils.dirname(target_path)
 
61
        if not os.path.exists(parent):
 
62
            os.makedirs(parent)
 
63
        kind = osutils.file_kind(path)
 
64
        if kind == 'file':
 
65
            copy2(path, target_path)
 
66
        if kind == 'directory':
 
67
            os.mkdir(target_path)
 
68
 
 
69
    def close(self):
 
70
        pass
 
71
 
 
72
 
 
73
class TestImport(TestCaseInTempDir):
 
74
 
 
75
    def make_tar(self, mode='w'):
 
76
        def maker(fileobj):
 
77
            return tarfile.open('project-0.1.tar', mode, fileobj)
 
78
        return self.make_archive(maker)
 
79
 
 
80
    def make_archive(self, maker, subdir=True):
 
81
        result = StringIO()
 
82
        archive_file = maker(result)
 
83
        try:
 
84
            os.mkdir('project-0.1')
 
85
            if subdir:
 
86
                prefix='project-0.1/'
 
87
                archive_file.add('project-0.1')
 
88
            else:
 
89
                prefix=''
 
90
                os.chdir('project-0.1')
 
91
            os.mkdir(prefix + 'junk')
 
92
            archive_file.add(prefix + 'junk')
 
93
 
 
94
            f = file(prefix + 'README', 'wb')
 
95
            f.write('What?')
 
96
            f.close()
 
97
            archive_file.add(prefix + 'README')
 
98
 
 
99
            f = file(prefix + 'FEEDME', 'wb')
 
100
            f.write('Hungry!!')
 
101
            f.close()
 
102
            archive_file.add(prefix + 'FEEDME')
 
103
 
 
104
            archive_file.close()
 
105
        finally:
 
106
            if not subdir:
 
107
                os.chdir('..')
 
108
            rmtree('project-0.1')
 
109
        result.seek(0)
 
110
        return result
 
111
 
 
112
    def make_archive2(self, builder, subdir):
 
113
        result = StringIO()
 
114
        archive_file = builder(result)
 
115
        os.mkdir('project-0.2')
 
116
        try:
 
117
            if subdir:
 
118
                prefix='project-0.2/'
 
119
                archive_file.add('project-0.2')
 
120
            else:
 
121
                prefix=''
 
122
                os.chdir('project-0.2')
 
123
 
 
124
            os.mkdir(prefix + 'junk')
 
125
            archive_file.add(prefix + 'junk')
 
126
 
 
127
            f = file(prefix + 'README', 'wb')
 
128
            f.write('Now?')
 
129
            f.close()
 
130
            archive_file.add(prefix + 'README')
 
131
 
 
132
            f = file(prefix + 'README', 'wb')
 
133
            f.write('Wow?')
 
134
            f.close()
 
135
            # Add a second entry for README with different contents.
 
136
            archive_file.add(prefix + 'README')
 
137
            archive_file.close()
 
138
 
 
139
        finally:
 
140
            if not subdir:
 
141
                os.chdir('..')
 
142
        result.seek(0)
 
143
        return result
 
144
 
 
145
    def make_messed_tar(self):
 
146
        result = StringIO()
 
147
        tar_file = tarfile.open('project-0.1.tar', 'w', result)
 
148
        os.mkdir('project-0.1')
 
149
        tar_file.add('project-0.1')
 
150
 
 
151
        os.mkdir('project-0.2')
 
152
        tar_file.add('project-0.2')
 
153
 
 
154
        f = file('project-0.1/README', 'wb')
 
155
        f.write('What?')
 
156
        f.close()
 
157
        tar_file.add('project-0.1/README')
 
158
        tar_file.close()
 
159
        rmtree('project-0.1')
 
160
        result.seek(0)
 
161
        return result
 
162
 
 
163
    def make_zip(self):
 
164
        def maker(fileobj):
 
165
            return ZipFileWrapper(fileobj, 'w')
 
166
        return self.make_archive(maker)
 
167
 
 
168
    def make_tar_with_bzrdir(self):
 
169
        result = StringIO()
 
170
        tar_file = tarfile.open('tar-with-bzrdir.tar', 'w', result)
 
171
        os.mkdir('toplevel-dir')
 
172
        tar_file.add('toplevel-dir')
 
173
        os.mkdir('toplevel-dir/.bzr')
 
174
        tar_file.add('toplevel-dir/.bzr')
 
175
        tar_file.close()
 
176
        rmtree('toplevel-dir')
 
177
        result.seek(0)
 
178
        return result
 
179
 
 
180
    def test_top_path(self):
 
181
        self.assertEqual(top_path('ab/b/c'), 'ab')
 
182
        self.assertEqual(top_path('etc'), 'etc')
 
183
        self.assertEqual(top_path('project-0.1'), 'project-0.1')
 
184
 
 
185
    def test_common_directory(self):
 
186
        self.assertEqual(common_directory(['ab/c/d', 'ab/c/e']), 'ab')
 
187
        self.assertIs(common_directory(['ab/c/d', 'ac/c/e']), None)
 
188
        self.assertEqual('FEEDME', common_directory(['FEEDME']))
 
189
 
 
190
    def test_untar(self):
 
191
        def builder(fileobj, mode='w'):
 
192
            return tarfile.open('project-0.1.tar', mode, fileobj)
 
193
        self.archive_test(builder, import_tar)
 
194
 
 
195
    def test_broken_tar(self):
 
196
        def builder(fileobj, mode='w'):
 
197
            return tarfile.open('project-0.1.tar', mode, fileobj)
 
198
        self.archive_test(builder, import_tar_broken, subdir=True)
 
199
 
 
200
    def test_unzip(self):
 
201
        def builder(fileobj, mode='w'):
 
202
            return ZipFileWrapper(fileobj, mode)
 
203
        self.archive_test(builder, import_zip)
 
204
 
 
205
    def test_copydir_nosub(self):
 
206
        def builder(fileobj, mode='w'):
 
207
            return DirFileWriter(fileobj, mode)
 
208
        # It would be bogus to test with the result in a subdirectory,
 
209
        # because for directories, the input root is always the output root.
 
210
        self.archive_test(builder, import_dir)
 
211
 
 
212
    def archive_test(self, builder, importer, subdir=False):
 
213
        archive_file = self.make_archive(builder, subdir)
 
214
        tree = BzrDir.create_standalone_workingtree('tree')
 
215
        tree.lock_write()
 
216
        try:
 
217
            importer(tree, archive_file)
 
218
            self.assertTrue(tree.path2id('README') is not None)
 
219
            self.assertTrue(tree.path2id('FEEDME') is not None)
 
220
            self.assertTrue(os.path.isfile(tree.abspath('README')))
 
221
            self.assertEqual(tree.inventory[tree.path2id('README')].kind,
 
222
                'file')
 
223
            self.assertEqual(tree.inventory[tree.path2id('FEEDME')].kind,
 
224
                'file')
 
225
            f = file(tree.abspath('junk/food'), 'wb')
 
226
            f.write('I like food\n')
 
227
            f.close()
 
228
 
 
229
            archive_file = self.make_archive2(builder, subdir)
 
230
            importer(tree, archive_file)
 
231
            self.assertTrue(tree.path2id('README') is not None)
 
232
            # Ensure the second version of the file is used.
 
233
            self.assertEqual(tree.get_file_text(tree.path2id('README')),
 
234
                             'Wow?')
 
235
            self.assertTrue(not os.path.exists(tree.abspath('FEEDME')))
 
236
        finally:
 
237
            tree.unlock()
 
238
 
 
239
 
 
240
    def test_untar2(self):
 
241
        tar_file = self.make_messed_tar()
 
242
        tree = BzrDir.create_standalone_workingtree('tree')
 
243
        import_tar(tree, tar_file)
 
244
        self.assertTrue(tree.path2id('project-0.1/README') is not None)
 
245
 
 
246
    def test_untar_gzip(self):
 
247
        tar_file = self.make_tar(mode='w:gz')
 
248
        tree = BzrDir.create_standalone_workingtree('tree')
 
249
        import_tar(tree, tar_file)
 
250
        self.assertTrue(tree.path2id('README') is not None)
 
251
 
 
252
    def test_no_crash_with_bzrdir(self):
 
253
        tar_file = self.make_tar_with_bzrdir()
 
254
        tree = BzrDir.create_standalone_workingtree('tree')
 
255
        import_tar(tree, tar_file)
 
256
        # So long as it did not crash, that should be ok
 
257
 
 
258
 
 
259
class TestWithStuff(TestCaseWithTransport):
 
260
 
 
261
    def transform_to_tar(self, tt):
 
262
        stream = StringIO()
 
263
        tarball = tarfile.open(None, 'w|', stream)
 
264
        export_tarball(tt.get_preview_tree(), tarball, '')
 
265
        return stream
 
266
 
 
267
    def get_empty_tt(self):
 
268
        b = self.make_repository('foo')
 
269
        null_tree = b.revision_tree(_mod_revision.NULL_REVISION)
 
270
        tt = transform.TransformPreview(null_tree)
 
271
        root = tt.new_directory('', transform.ROOT_PARENT, 'tree-root')
 
272
        tt.fixup_new_roots()
 
273
        self.addCleanup(tt.finalize)
 
274
        return tt
 
275
 
 
276
    def test_nonascii_paths(self):
 
277
        self.requireFeature(UnicodeFilenameFeature)
 
278
        tt = self.get_empty_tt()
 
279
        encoded_file = tt.new_file(
 
280
            u'\u1234file', tt.root, 'contents', 'new-file')
 
281
        encoded_file = tt.new_file(
 
282
            'other', tt.root, 'contents', 'other-file')
 
283
        tarfile = self.transform_to_tar(tt)
 
284
        tarfile.seek(0)
 
285
        tree = self.make_branch_and_tree('bar')
 
286
        import_tar(tree, tarfile)
 
287
        self.assertPathExists(u'bar/\u1234file')
 
288
 
 
289
 
 
290
def test_suite():
 
291
    return makeSuite(TestImport)