~abentley/bzrtools/bzrtools.dev

« back to all changes in this revision

Viewing changes to tests/upstream_import.py

  • Committer: Alexander Belchenko
  • Date: 2007-06-12 18:31:58 UTC
  • mto: This revision was merged to the branch mainline in revision 637.
  • Revision ID: bialix@ukr.net-20070612183158-sqt205eb1s910jca
AUTHORS

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
import os
2
 
from StringIO import StringIO
3
 
from shutil import rmtree, copy2, copytree
4
 
import tarfile
5
 
import tempfile
6
 
from unittest import makeSuite
7
 
 
8
 
from bzrlib import (
9
 
    osutils,
10
 
    revision as _mod_revision,
11
 
    transform
12
 
    )
13
 
from bzrlib.bzrdir import BzrDir
14
 
from bzrlib.export import export
15
 
from bzrlib.plugins.bzrtools import errors
16
 
from bzrlib.plugins.bzrtools.upstream_import import (
17
 
    common_directory,
18
 
    get_archive_type,
19
 
    import_archive,
20
 
    import_tar,
21
 
    import_zip,
22
 
    import_dir,
23
 
    top_path,
24
 
    ZipFileWrapper,
25
 
)
26
 
from bzrlib.tests import (
27
 
    TestCaseInTempDir,
28
 
    TestCaseWithTransport,
29
 
    )
30
 
try:
31
 
    from bzrlib.tests.features import UnicodeFilenameFeature
32
 
except ImportError: # bzr < 2.5
33
 
    from bzrlib.tests import UnicodeFilenameFeature
34
 
 
35
 
 
36
 
def import_tar_broken(tree, tar_input):
37
 
    """
38
 
    Import a tarfile with names that that end in //, e.g. Feisty Python 2.5
39
 
    """
40
 
    tar_file = tarfile.open('lala', 'r', tar_input)
41
 
    for member in tar_file.members:
42
 
        if member.name.endswith('/'):
43
 
            member.name += '/'
44
 
    import_archive(tree, tar_file)
45
 
 
46
 
 
47
 
class DirFileWriter(object):
48
 
 
49
 
    def __init__(self, fileobj, mode):
50
 
        # We may be asked to 'append'.  If so, fileobj already has a path.
51
 
        # So we copy the existing tree, and overwrite afterward.
52
 
        fileobj.seek(0)
53
 
        existing = fileobj.read()
54
 
        fileobj.seek(0)
55
 
        path = tempfile.mkdtemp(dir=os.getcwd())
56
 
        if existing != '':
57
 
            # copytree requires the directory not to exist
58
 
            os.rmdir(path)
59
 
            copytree(existing, path)
60
 
        fileobj.write(path)
61
 
        self.root = path
62
 
 
63
 
    def add(self, path):
64
 
        target_path = os.path.join(self.root, path)
65
 
        parent = osutils.dirname(target_path)
66
 
        if not os.path.exists(parent):
67
 
            os.makedirs(parent)
68
 
        kind = osutils.file_kind(path)
69
 
        if kind == 'file':
70
 
            copy2(path, target_path)
71
 
        if kind == 'directory':
72
 
            os.mkdir(target_path)
73
 
 
74
 
    def close(self):
75
 
        pass
76
 
 
77
 
 
78
 
class TestImport(TestCaseInTempDir):
79
 
 
80
 
    def make_tar(self, mode='w'):
81
 
        def maker(fileobj):
82
 
            return tarfile.open('project-0.1.tar', mode, fileobj)
83
 
        return self.make_archive(maker)
84
 
 
85
 
    def make_archive(self, maker, subdir=True):
86
 
        result = StringIO()
87
 
        archive_file = maker(result)
88
 
        try:
89
 
            os.mkdir('project-0.1')
90
 
            if subdir:
91
 
                prefix='project-0.1/'
92
 
                archive_file.add('project-0.1')
93
 
            else:
94
 
                prefix=''
95
 
                os.chdir('project-0.1')
96
 
            os.mkdir(prefix + 'junk')
97
 
            archive_file.add(prefix + 'junk')
98
 
 
99
 
            f = file(prefix + 'README', 'wb')
100
 
            f.write('What?')
101
 
            f.close()
102
 
            archive_file.add(prefix + 'README')
103
 
 
104
 
            f = file(prefix + 'FEEDME', 'wb')
105
 
            f.write('Hungry!!')
106
 
            f.close()
107
 
            archive_file.add(prefix + 'FEEDME')
108
 
 
109
 
            archive_file.close()
110
 
        finally:
111
 
            if not subdir:
112
 
                os.chdir('..')
113
 
            rmtree('project-0.1')
114
 
        result.seek(0)
115
 
        return result
116
 
 
117
 
    def make_archive2(self, builder, subdir):
118
 
        result = StringIO()
119
 
        archive_file = builder(result)
120
 
        os.mkdir('project-0.2')
121
 
        try:
122
 
            if subdir:
123
 
                prefix='project-0.2/'
124
 
                archive_file.add('project-0.2')
125
 
            else:
126
 
                prefix=''
127
 
                os.chdir('project-0.2')
128
 
 
129
 
            os.mkdir(prefix + 'junk')
130
 
            archive_file.add(prefix + 'junk')
131
 
 
132
 
            f = file(prefix + 'README', 'wb')
133
 
            f.write('Now?')
134
 
            f.close()
135
 
            archive_file.add(prefix + 'README')
136
 
 
137
 
            f = file(prefix + 'README', 'wb')
138
 
            f.write('Wow?')
139
 
            f.close()
140
 
            # Add a second entry for README with different contents.
141
 
            archive_file.add(prefix + 'README')
142
 
            archive_file.close()
143
 
 
144
 
        finally:
145
 
            if not subdir:
146
 
                os.chdir('..')
147
 
        result.seek(0)
148
 
        return result
149
 
 
150
 
    def make_messed_tar(self):
151
 
        result = StringIO()
152
 
        tar_file = tarfile.open('project-0.1.tar', 'w', result)
153
 
        os.mkdir('project-0.1')
154
 
        tar_file.add('project-0.1')
155
 
 
156
 
        os.mkdir('project-0.2')
157
 
        tar_file.add('project-0.2')
158
 
 
159
 
        f = file('project-0.1/README', 'wb')
160
 
        f.write('What?')
161
 
        f.close()
162
 
        tar_file.add('project-0.1/README')
163
 
        tar_file.close()
164
 
        rmtree('project-0.1')
165
 
        result.seek(0)
166
 
        return result
167
 
 
168
 
    def make_zip(self):
169
 
        def maker(fileobj):
170
 
            return ZipFileWrapper(fileobj, 'w')
171
 
        return self.make_archive(maker)
172
 
 
173
 
    def make_tar_with_bzrdir(self):
174
 
        result = StringIO()
175
 
        tar_file = tarfile.open('tar-with-bzrdir.tar', 'w', result)
176
 
        os.mkdir('toplevel-dir')
177
 
        tar_file.add('toplevel-dir')
178
 
        os.mkdir('toplevel-dir/.bzr')
179
 
        tar_file.add('toplevel-dir/.bzr')
180
 
        tar_file.close()
181
 
        rmtree('toplevel-dir')
182
 
        result.seek(0)
183
 
        return result
184
 
 
185
 
    def test_top_path(self):
186
 
        self.assertEqual(top_path('ab/b/c'), 'ab')
187
 
        self.assertEqual(top_path('etc'), 'etc')
188
 
        self.assertEqual(top_path('project-0.1'), 'project-0.1')
189
 
 
190
 
    def test_common_directory(self):
191
 
        self.assertEqual(common_directory(['ab/c/d', 'ab/c/e']), 'ab')
192
 
        self.assertIs(common_directory(['ab/c/d', 'ac/c/e']), None)
193
 
        self.assertEqual('FEEDME', common_directory(['FEEDME']))
194
 
 
195
 
    def test_untar(self):
196
 
        def builder(fileobj, mode='w'):
197
 
            return tarfile.open('project-0.1.tar', mode, fileobj)
198
 
        self.archive_test(builder, import_tar)
199
 
 
200
 
    def test_broken_tar(self):
201
 
        def builder(fileobj, mode='w'):
202
 
            return tarfile.open('project-0.1.tar', mode, fileobj)
203
 
        self.archive_test(builder, import_tar_broken, subdir=True)
204
 
 
205
 
    def test_unzip(self):
206
 
        def builder(fileobj, mode='w'):
207
 
            return ZipFileWrapper(fileobj, mode)
208
 
        self.archive_test(builder, import_zip)
209
 
 
210
 
    def test_copydir_nosub(self):
211
 
        def builder(fileobj, mode='w'):
212
 
            return DirFileWriter(fileobj, mode)
213
 
        # It would be bogus to test with the result in a subdirectory,
214
 
        # because for directories, the input root is always the output root.
215
 
        self.archive_test(builder, import_dir)
216
 
 
217
 
    def archive_test(self, builder, importer, subdir=False):
218
 
        archive_file = self.make_archive(builder, subdir)
219
 
        tree = BzrDir.create_standalone_workingtree('tree')
220
 
        tree.lock_write()
221
 
        try:
222
 
            importer(tree, archive_file)
223
 
            self.assertTrue(tree.path2id('README') is not None)
224
 
            self.assertTrue(tree.path2id('FEEDME') is not None)
225
 
            self.assertTrue(os.path.isfile(tree.abspath('README')))
226
 
            self.assertEqual(tree.inventory[tree.path2id('README')].kind,
227
 
                'file')
228
 
            self.assertEqual(tree.inventory[tree.path2id('FEEDME')].kind,
229
 
                'file')
230
 
            f = file(tree.abspath('junk/food'), 'wb')
231
 
            f.write('I like food\n')
232
 
            f.close()
233
 
 
234
 
            archive_file = self.make_archive2(builder, subdir)
235
 
            importer(tree, archive_file)
236
 
            self.assertTrue(tree.path2id('README') is not None)
237
 
            # Ensure the second version of the file is used.
238
 
            self.assertEqual(tree.get_file_text(tree.path2id('README')),
239
 
                             'Wow?')
240
 
            self.assertTrue(not os.path.exists(tree.abspath('FEEDME')))
241
 
        finally:
242
 
            tree.unlock()
243
 
 
244
 
 
245
 
    def test_untar2(self):
246
 
        tar_file = self.make_messed_tar()
247
 
        tree = BzrDir.create_standalone_workingtree('tree')
248
 
        import_tar(tree, tar_file)
249
 
        self.assertTrue(tree.path2id('project-0.1/README') is not None)
250
 
 
251
 
    def test_untar_gzip(self):
252
 
        tar_file = self.make_tar(mode='w:gz')
253
 
        tree = BzrDir.create_standalone_workingtree('tree')
254
 
        import_tar(tree, tar_file)
255
 
        self.assertTrue(tree.path2id('README') is not None)
256
 
 
257
 
    def test_no_crash_with_bzrdir(self):
258
 
        tar_file = self.make_tar_with_bzrdir()
259
 
        tree = BzrDir.create_standalone_workingtree('tree')
260
 
        import_tar(tree, tar_file)
261
 
        # So long as it did not crash, that should be ok
262
 
 
263
 
    def test_get_archive_type(self):
264
 
        self.assertEqual(('tar', None), get_archive_type('foo.tar'))
265
 
        self.assertEqual(('zip', None), get_archive_type('foo.zip'))
266
 
        self.assertRaises(errors.NotArchiveType, get_archive_type, 'foo.gif')
267
 
        self.assertEqual(('tar', 'gz'), get_archive_type('foo.tar.gz'))
268
 
        self.assertRaises(errors.NotArchiveType, get_archive_type,
269
 
                          'foo.zip.gz')
270
 
        self.assertEqual(('tar', 'gz'), get_archive_type('foo.tgz'))
271
 
        self.assertEqual(('tar', 'lzma'), get_archive_type('foo.tar.lzma'))
272
 
        self.assertEqual(('tar', 'lzma'), get_archive_type('foo.tar.xz'))
273
 
        self.assertEqual(('tar', 'bz2'), get_archive_type('foo.tar.bz2'))
274
 
 
275
 
 
276
 
class TestWithStuff(TestCaseWithTransport):
277
 
 
278
 
    def transform_to_tar(self, tt):
279
 
        stream = StringIO()
280
 
        export(tt.get_preview_tree(), root='', fileobj=stream, format='tar',
281
 
               dest=None)
282
 
        return stream
283
 
 
284
 
    def get_empty_tt(self):
285
 
        b = self.make_repository('foo')
286
 
        null_tree = b.revision_tree(_mod_revision.NULL_REVISION)
287
 
        tt = transform.TransformPreview(null_tree)
288
 
        root = tt.new_directory('', transform.ROOT_PARENT, 'tree-root')
289
 
        tt.fixup_new_roots()
290
 
        self.addCleanup(tt.finalize)
291
 
        return tt
292
 
 
293
 
    def test_nonascii_paths(self):
294
 
        self.requireFeature(UnicodeFilenameFeature)
295
 
        tt = self.get_empty_tt()
296
 
        encoded_file = tt.new_file(
297
 
            u'\u1234file', tt.root, 'contents', 'new-file')
298
 
        encoded_file = tt.new_file(
299
 
            'other', tt.root, 'contents', 'other-file')
300
 
        tarfile = self.transform_to_tar(tt)
301
 
        tarfile.seek(0)
302
 
        tree = self.make_branch_and_tree('bar')
303
 
        import_tar(tree, tarfile)
304
 
        self.assertPathExists(u'bar/\u1234file')
305
 
 
306
 
 
307
 
def test_suite():
308
 
    return makeSuite(TestImport)