~abentley/bzrtools/bzrtools.dev

« back to all changes in this revision

Viewing changes to tests/upstream_import.py

  • Committer: Aaron Bentley
  • Date: 2006-03-22 15:19:16 UTC
  • Revision ID: abentley@panoramicfeedback.com-20060322151916-75711de1522d1f68
Tagged BZRTOOLS commands to reduce confusion

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
import os
2
 
from StringIO import StringIO
3
 
from shutil import rmtree, copy2, copytree
4
 
import tarfile
5
 
import tempfile
6
 
from unittest import makeSuite
7
 
 
8
 
from bzrlib import osutils
9
 
from bzrlib.bzrdir import BzrDir
10
 
from bzrlib.plugins.bzrtools.upstream_import import (
11
 
    common_directory,
12
 
    import_archive,
13
 
    import_tar,
14
 
    import_zip,
15
 
    import_dir,
16
 
    top_path,
17
 
    ZipFileWrapper,
18
 
)
19
 
from bzrlib.tests import TestCaseInTempDir
20
 
 
21
 
 
22
 
def import_tar_broken(tree, tar_input):
23
 
    """
24
 
    Import a tarfile with names that that end in //, e.g. Feisty Python 2.5
25
 
    """
26
 
    tar_file = tarfile.open('lala', 'r', tar_input)
27
 
    for member in tar_file.members:
28
 
        if member.name.endswith('/'):
29
 
            member.name += '/'
30
 
    import_archive(tree, tar_file)
31
 
 
32
 
 
33
 
class DirFileWriter(object):
34
 
 
35
 
    def __init__(self, fileobj, mode):
36
 
        # We may be asked to 'append'.  If so, fileobj already has a path.
37
 
        # So we copy the existing tree, and overwrite afterward.
38
 
        fileobj.seek(0)
39
 
        existing = fileobj.read()
40
 
        fileobj.seek(0)
41
 
        path = tempfile.mkdtemp(dir=os.getcwd())
42
 
        if existing != '':
43
 
            # copytree requires the directory not to exist
44
 
            os.rmdir(path)
45
 
            copytree(existing, path)
46
 
        fileobj.write(path)
47
 
        self.root = path
48
 
 
49
 
    def add(self, path):
50
 
        target_path = os.path.join(self.root, path)
51
 
        parent = osutils.dirname(target_path)
52
 
        if not os.path.exists(parent):
53
 
            os.makedirs(parent)
54
 
        kind = osutils.file_kind(path)
55
 
        if kind == 'file':
56
 
            copy2(path, target_path)
57
 
        if kind == 'directory':
58
 
            os.mkdir(target_path)
59
 
 
60
 
    def close(self):
61
 
        pass
62
 
 
63
 
 
64
 
class TestImport(TestCaseInTempDir):
65
 
 
66
 
    def make_tar(self, mode='w'):
67
 
        def maker(fileobj):
68
 
            return tarfile.open('project-0.1.tar', mode, fileobj)
69
 
        return self.make_archive(maker)
70
 
 
71
 
    def make_archive(self, maker, subdir=True):
72
 
        result = StringIO()
73
 
        archive_file = maker(result)
74
 
        try:
75
 
            os.mkdir('project-0.1')
76
 
            if subdir:
77
 
                prefix='project-0.1/'
78
 
                archive_file.add('project-0.1')
79
 
            else:
80
 
                prefix=''
81
 
                os.chdir('project-0.1')
82
 
            os.mkdir(prefix + 'junk')
83
 
            archive_file.add(prefix + 'junk')
84
 
 
85
 
            f = file(prefix + 'README', 'wb')
86
 
            f.write('What?')
87
 
            f.close()
88
 
            archive_file.add(prefix + 'README')
89
 
 
90
 
            f = file(prefix + 'FEEDME', 'wb')
91
 
            f.write('Hungry!!')
92
 
            f.close()
93
 
            archive_file.add(prefix + 'FEEDME')
94
 
 
95
 
            archive_file.close()
96
 
        finally:
97
 
            if not subdir:
98
 
                os.chdir('..')
99
 
            rmtree('project-0.1')
100
 
        result.seek(0)
101
 
        return result
102
 
 
103
 
    def make_archive2(self, builder, subdir):
104
 
        result = StringIO()
105
 
        archive_file = builder(result)
106
 
        os.mkdir('project-0.2')
107
 
        try:
108
 
            if subdir:
109
 
                prefix='project-0.2/'
110
 
                archive_file.add('project-0.2')
111
 
            else:
112
 
                prefix=''
113
 
                os.chdir('project-0.2')
114
 
 
115
 
            os.mkdir(prefix + 'junk')
116
 
            archive_file.add(prefix + 'junk')
117
 
 
118
 
            f = file(prefix + 'README', 'wb')
119
 
            f.write('Now?')
120
 
            f.close()
121
 
            archive_file.add(prefix + 'README')
122
 
 
123
 
            f = file(prefix + 'README', 'wb')
124
 
            f.write('Wow?')
125
 
            f.close()
126
 
            # Add a second entry for README with different contents.
127
 
            archive_file.add(prefix + 'README')
128
 
            archive_file.close()
129
 
 
130
 
        finally:
131
 
            if not subdir:
132
 
                os.chdir('..')
133
 
        result.seek(0)
134
 
        return result
135
 
 
136
 
    def make_messed_tar(self):
137
 
        result = StringIO()
138
 
        tar_file = tarfile.open('project-0.1.tar', 'w', result)
139
 
        os.mkdir('project-0.1')
140
 
        tar_file.add('project-0.1')
141
 
 
142
 
        os.mkdir('project-0.2')
143
 
        tar_file.add('project-0.2')
144
 
 
145
 
        f = file('project-0.1/README', 'wb')
146
 
        f.write('What?')
147
 
        f.close()
148
 
        tar_file.add('project-0.1/README')
149
 
        tar_file.close()
150
 
        rmtree('project-0.1')
151
 
        result.seek(0)
152
 
        return result
153
 
 
154
 
    def make_zip(self):
155
 
        def maker(fileobj):
156
 
            return ZipFileWrapper(fileobj, 'w')
157
 
        return self.make_archive(maker)
158
 
 
159
 
    def make_tar_with_bzrdir(self):
160
 
        result = StringIO()
161
 
        tar_file = tarfile.open('tar-with-bzrdir.tar', 'w', result)
162
 
        os.mkdir('toplevel-dir')
163
 
        tar_file.add('toplevel-dir')
164
 
        os.mkdir('toplevel-dir/.bzr')
165
 
        tar_file.add('toplevel-dir/.bzr')
166
 
        tar_file.close()
167
 
        rmtree('toplevel-dir')
168
 
        result.seek(0)
169
 
        return result
170
 
 
171
 
    def test_top_path(self):
172
 
        self.assertEqual(top_path('ab/b/c'), 'ab')
173
 
        self.assertEqual(top_path('etc'), 'etc')
174
 
        self.assertEqual(top_path('project-0.1'), 'project-0.1')
175
 
 
176
 
    def test_common_directory(self):
177
 
        self.assertEqual(common_directory(['ab/c/d', 'ab/c/e']), 'ab')
178
 
        self.assertIs(common_directory(['ab/c/d', 'ac/c/e']), None)
179
 
        self.assertEqual('FEEDME', common_directory(['FEEDME']))
180
 
 
181
 
    def test_untar(self):
182
 
        def builder(fileobj, mode='w'):
183
 
            return tarfile.open('project-0.1.tar', mode, fileobj)
184
 
        self.archive_test(builder, import_tar)
185
 
 
186
 
    def test_broken_tar(self):
187
 
        def builder(fileobj, mode='w'):
188
 
            return tarfile.open('project-0.1.tar', mode, fileobj)
189
 
        self.archive_test(builder, import_tar_broken, subdir=True)
190
 
 
191
 
    def test_unzip(self):
192
 
        def builder(fileobj, mode='w'):
193
 
            return ZipFileWrapper(fileobj, mode)
194
 
        self.archive_test(builder, import_zip)
195
 
 
196
 
    def test_copydir_nosub(self):
197
 
        def builder(fileobj, mode='w'):
198
 
            return DirFileWriter(fileobj, mode)
199
 
        # It would be bogus to test with the result in a subdirectory,
200
 
        # because for directories, the input root is always the output root.
201
 
        self.archive_test(builder, import_dir)
202
 
 
203
 
    def archive_test(self, builder, importer, subdir=False):
204
 
        archive_file = self.make_archive(builder, subdir)
205
 
        tree = BzrDir.create_standalone_workingtree('tree')
206
 
        tree.lock_write()
207
 
        try:
208
 
            importer(tree, archive_file)
209
 
            self.assertTrue(tree.path2id('README') is not None)
210
 
            self.assertTrue(tree.path2id('FEEDME') is not None)
211
 
            self.assertTrue(os.path.isfile(tree.abspath('README')))
212
 
            self.assertEqual(tree.inventory[tree.path2id('README')].kind,
213
 
                'file')
214
 
            self.assertEqual(tree.inventory[tree.path2id('FEEDME')].kind,
215
 
                'file')
216
 
            f = file(tree.abspath('junk/food'), 'wb')
217
 
            f.write('I like food\n')
218
 
            f.close()
219
 
 
220
 
            archive_file = self.make_archive2(builder, subdir)
221
 
            importer(tree, archive_file)
222
 
            self.assertTrue(tree.path2id('README') is not None)
223
 
            # Ensure the second version of the file is used.
224
 
            self.assertEqual(tree.get_file_text(tree.path2id('README')),
225
 
                             'Wow?')
226
 
            self.assertTrue(not os.path.exists(tree.abspath('FEEDME')))
227
 
        finally:
228
 
            tree.unlock()
229
 
 
230
 
 
231
 
    def test_untar2(self):
232
 
        tar_file = self.make_messed_tar()
233
 
        tree = BzrDir.create_standalone_workingtree('tree')
234
 
        import_tar(tree, tar_file)
235
 
        self.assertTrue(tree.path2id('project-0.1/README') is not None)
236
 
 
237
 
    def test_untar_gzip(self):
238
 
        tar_file = self.make_tar(mode='w:gz')
239
 
        tree = BzrDir.create_standalone_workingtree('tree')
240
 
        import_tar(tree, tar_file)
241
 
        self.assertTrue(tree.path2id('README') is not None)
242
 
 
243
 
    def test_no_crash_with_bzrdir(self):
244
 
        tar_file = self.make_tar_with_bzrdir()
245
 
        tree = BzrDir.create_standalone_workingtree('tree')
246
 
        import_tar(tree, tar_file)
247
 
        # So long as it did not crash, that should be ok
248
 
 
249
 
def test_suite():
250
 
    return makeSuite(TestImport)