~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_export.py

  • Committer: Andrew Bennetts
  • Date: 2009-11-25 07:27:43 UTC
  • mto: This revision was merged to the branch mainline in revision 4825.
  • Revision ID: andrew.bennetts@canonical.com-20091125072743-v6sv4m2mkt9iyslp
Terminate SSHSubprocesses when no refs to them are left, in case .close is never called.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
"""Tests for bzrlib.export."""
18
 
 
19
 
from cStringIO import StringIO
20
17
import os
21
 
import tarfile
22
 
import time
23
 
import zipfile
 
18
 
24
19
 
25
20
from bzrlib import (
26
21
    errors,
27
22
    export,
 
23
    osutils,
28
24
    tests,
29
25
    )
30
 
from bzrlib.export import get_root_name
31
 
from bzrlib.export.tar_exporter import export_tarball_generator
32
 
from bzrlib.tests import features
33
 
 
34
 
 
35
 
class TestDirExport(tests.TestCaseWithTransport):
36
 
 
37
 
    def test_missing_file(self):
 
26
 
 
27
 
 
28
class TestExport(tests.TestCaseWithTransport):
 
29
 
 
30
    def test_dir_export_missing_file(self):
38
31
        self.build_tree(['a/', 'a/b', 'a/c'])
39
32
        wt = self.make_branch_and_tree('.')
40
33
        wt.add(['a', 'a/b', 'a/c'])
41
34
        os.unlink('a/c')
42
35
        export.export(wt, 'target', format="dir")
43
 
        self.assertPathExists('target/a/b')
44
 
        self.assertPathDoesNotExist('target/a/c')
45
 
 
46
 
    def test_empty(self):
47
 
        wt = self.make_branch_and_tree('.')
48
 
        export.export(wt, 'target', format="dir")
49
 
        self.assertEquals([], os.listdir("target"))
50
 
 
51
 
    def test_symlink(self):
 
36
        self.failUnlessExists('target/a/b')
 
37
        self.failIfExists('target/a/c')
 
38
 
 
39
    def test_dir_export_symlink(self):
52
40
        self.requireFeature(tests.SymlinkFeature)
53
41
        wt = self.make_branch_and_tree('.')
54
42
        os.symlink('source', 'link')
55
43
        wt.add(['link'])
56
44
        export.export(wt, 'target', format="dir")
57
 
        self.assertPathExists('target/link')
 
45
        self.failUnlessExists('target/link')
58
46
 
59
 
    def test_to_existing_empty_dir_success(self):
 
47
    def test_dir_export_to_existing_empty_dir_success(self):
60
48
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
61
49
        wt = self.make_branch_and_tree('source')
62
50
        wt.add(['a', 'b', 'b/c'])
63
51
        wt.commit('1')
64
52
        self.build_tree(['target/'])
65
53
        export.export(wt, 'target', format="dir")
66
 
        self.assertPathExists('target/a')
67
 
        self.assertPathExists('target/b')
68
 
        self.assertPathExists('target/b/c')
69
 
 
70
 
    def test_empty_subdir(self):
71
 
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
72
 
        wt = self.make_branch_and_tree('source')
73
 
        wt.add(['a', 'b', 'b/c'])
74
 
        wt.commit('1')
75
 
        self.build_tree(['target/'])
76
 
        export.export(wt, 'target', format="dir", subdir='')
77
 
        self.assertPathExists('target/a')
78
 
        self.assertPathExists('target/b')
79
 
        self.assertPathExists('target/b/c')
80
 
 
81
 
    def test_to_existing_nonempty_dir_fail(self):
 
54
        self.failUnlessExists('target/a')
 
55
        self.failUnlessExists('target/b')
 
56
        self.failUnlessExists('target/b/c')
 
57
 
 
58
    def test_dir_export_to_existing_nonempty_dir_fail(self):
82
59
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
83
60
        wt = self.make_branch_and_tree('source')
84
61
        wt.add(['a', 'b', 'b/c'])
85
62
        wt.commit('1')
86
63
        self.build_tree(['target/', 'target/foo'])
87
 
        self.assertRaises(errors.BzrError,
88
 
            export.export, wt, 'target', format="dir")
89
 
 
90
 
    def test_existing_single_file(self):
91
 
        self.build_tree([
92
 
            'dir1/', 'dir1/dir2/', 'dir1/first', 'dir1/dir2/second'])
93
 
        wtree = self.make_branch_and_tree('dir1')
94
 
        wtree.add(['dir2', 'first', 'dir2/second'])
95
 
        wtree.commit('1')
96
 
        export.export(wtree, 'target1', format='dir', subdir='first')
97
 
        self.assertPathExists('target1/first')
98
 
        export.export(wtree, 'target2', format='dir', subdir='dir2/second')
99
 
        self.assertPathExists('target2/second')
100
 
 
101
 
    def test_files_same_timestamp(self):
102
 
        builder = self.make_branch_builder('source')
103
 
        builder.start_series()
104
 
        builder.build_snapshot(None, None, [
105
 
            ('add', ('', 'root-id', 'directory', '')),
106
 
            ('add', ('a', 'a-id', 'file', 'content\n'))])
107
 
        builder.build_snapshot(None, None, [
108
 
            ('add', ('b', 'b-id', 'file', 'content\n'))])
109
 
        builder.finish_series()
110
 
        b = builder.get_branch()
111
 
        b.lock_read()
112
 
        self.addCleanup(b.unlock)
113
 
        tree = b.basis_tree()
114
 
        orig_iter_files_bytes = tree.iter_files_bytes
115
 
 
116
 
        # Make iter_files_bytes slower, so we provoke mtime skew
117
 
        def iter_files_bytes(to_fetch):
118
 
            for thing in orig_iter_files_bytes(to_fetch):
119
 
                yield thing
120
 
                time.sleep(1)
121
 
        tree.iter_files_bytes = iter_files_bytes
122
 
        export.export(tree, 'target', format='dir')
123
 
        t = self.get_transport('target')
124
 
        st_a = t.stat('a')
125
 
        st_b = t.stat('b')
126
 
        # All files must be given the same mtime.
127
 
        self.assertEqual(st_a.st_mtime, st_b.st_mtime)
128
 
 
129
 
    def test_files_per_file_timestamps(self):
130
 
        builder = self.make_branch_builder('source')
131
 
        builder.start_series()
132
 
        # Earliest allowable date on FAT32 filesystems is 1980-01-01
133
 
        a_time = time.mktime((1999, 12, 12, 0, 0, 0, 0, 0, 0))
134
 
        b_time = time.mktime((1980, 01, 01, 0, 0, 0, 0, 0, 0))
135
 
        builder.build_snapshot(None, None, [
136
 
            ('add', ('', 'root-id', 'directory', '')),
137
 
            ('add', ('a', 'a-id', 'file', 'content\n'))],
138
 
            timestamp=a_time)
139
 
        builder.build_snapshot(None, None, [
140
 
            ('add', ('b', 'b-id', 'file', 'content\n'))],
141
 
            timestamp=b_time)
142
 
        builder.finish_series()
143
 
        b = builder.get_branch()
144
 
        b.lock_read()
145
 
        self.addCleanup(b.unlock)
146
 
        tree = b.basis_tree()
147
 
        export.export(tree, 'target', format='dir', per_file_timestamps=True)
148
 
        t = self.get_transport('target')
149
 
        self.assertEqual(a_time, t.stat('a').st_mtime)
150
 
        self.assertEqual(b_time, t.stat('b').st_mtime)
151
 
 
152
 
    def test_subdir_files_per_timestamps(self):
153
 
        builder = self.make_branch_builder('source')
154
 
        builder.start_series()
155
 
        foo_time = time.mktime((1999, 12, 12, 0, 0, 0, 0, 0, 0))
156
 
        builder.build_snapshot(None, None, [
157
 
            ('add', ('', 'root-id', 'directory', '')),
158
 
            ('add', ('subdir', 'subdir-id', 'directory', '')),
159
 
            ('add', ('subdir/foo.txt', 'foo-id', 'file', 'content\n'))],
160
 
            timestamp=foo_time)
161
 
        builder.finish_series()
162
 
        b = builder.get_branch()
163
 
        b.lock_read()
164
 
        self.addCleanup(b.unlock)
165
 
        tree = b.basis_tree()
166
 
        export.export(tree, 'target', format='dir', subdir='subdir',
167
 
            per_file_timestamps=True)
168
 
        t = self.get_transport('target')
169
 
        self.assertEquals(foo_time, t.stat('foo.txt').st_mtime)
170
 
 
171
 
 
172
 
class TarExporterTests(tests.TestCaseWithTransport):
173
 
 
174
 
    def test_xz(self):
175
 
        self.requireFeature(features.lzma)
176
 
        import lzma
177
 
        wt = self.make_branch_and_tree('.')
178
 
        self.build_tree(['a'])
179
 
        wt.add(["a"])
180
 
        wt.commit("1")
181
 
        export.export(wt, 'target.tar.xz', format="txz")
182
 
        tf = tarfile.open(fileobj=lzma.LZMAFile('target.tar.xz'))
183
 
        self.assertEquals(["target/a"], tf.getnames())
184
 
 
185
 
    def test_lzma(self):
186
 
        self.requireFeature(features.lzma)
187
 
        import lzma
188
 
        wt = self.make_branch_and_tree('.')
189
 
        self.build_tree(['a'])
190
 
        wt.add(["a"])
191
 
        wt.commit("1")
192
 
        export.export(wt, 'target.tar.lzma', format="tlzma")
193
 
        tf = tarfile.open(fileobj=lzma.LZMAFile('target.tar.lzma'))
194
 
        self.assertEquals(["target/a"], tf.getnames())
195
 
 
196
 
    def test_tgz(self):
197
 
        wt = self.make_branch_and_tree('.')
198
 
        self.build_tree(['a'])
199
 
        wt.add(["a"])
200
 
        wt.commit("1")
201
 
        export.export(wt, 'target.tar.gz', format="tgz")
202
 
        tf = tarfile.open('target.tar.gz')
203
 
        self.assertEquals(["target/a"], tf.getnames())
204
 
 
205
 
    def test_tgz_ignores_dest_path(self):
206
 
        # The target path should not be a part of the target file.
207
 
        # (bug #102234)
208
 
        wt = self.make_branch_and_tree('.')
209
 
        self.build_tree(['a'])
210
 
        wt.add(["a"])
211
 
        wt.commit("1")
212
 
        os.mkdir("testdir1")
213
 
        os.mkdir("testdir2")
214
 
        export.export(wt, 'testdir1/target.tar.gz', format="tgz",
215
 
            per_file_timestamps=True)
216
 
        export.export(wt, 'testdir2/target.tar.gz', format="tgz",
217
 
            per_file_timestamps=True)
218
 
        file1 = open('testdir1/target.tar.gz', 'r')
219
 
        self.addCleanup(file1.close)
220
 
        file2 = open('testdir1/target.tar.gz', 'r')
221
 
        self.addCleanup(file2.close)
222
 
        content1 = file1.read()
223
 
        content2 = file2.read()
224
 
        self.assertEqualDiff(content1, content2)
225
 
        # the gzip module doesn't have a way to read back to the original
226
 
        # filename, but it's stored as-is in the tarfile.
227
 
        self.assertFalse("testdir1" in content1)
228
 
        self.assertFalse("target.tar.gz" in content1)
229
 
        self.assertTrue("target.tar" in content1)
230
 
 
231
 
    def test_tbz2(self):
232
 
        wt = self.make_branch_and_tree('.')
233
 
        self.build_tree(['a'])
234
 
        wt.add(["a"])
235
 
        wt.commit("1")
236
 
        export.export(wt, 'target.tar.bz2', format="tbz2")
237
 
        tf = tarfile.open('target.tar.bz2')
238
 
        self.assertEquals(["target/a"], tf.getnames())
239
 
 
240
 
    def test_xz_stdout(self):
241
 
        wt = self.make_branch_and_tree('.')
242
 
        self.assertRaises(errors.BzrError, export.export, wt, '-',
243
 
            format="txz")
244
 
 
245
 
    def test_export_tarball_generator(self):
246
 
        wt = self.make_branch_and_tree('.')
247
 
        self.build_tree(['a'])
248
 
        wt.add(["a"])
249
 
        wt.commit("1", timestamp=42)
250
 
        target = StringIO()
251
 
        ball = tarfile.open(None, "w|", target)
252
 
        wt.lock_read()
253
 
        try:
254
 
            for _ in export_tarball_generator(wt, ball, "bar"):
255
 
                pass
256
 
        finally:
257
 
            wt.unlock()
258
 
        # Ball should now be closed.
259
 
        target.seek(0)
260
 
        ball2 = tarfile.open(None, "r", target)
261
 
        self.addCleanup(ball2.close)
262
 
        self.assertEquals(["bar/a"], ball2.getnames())
263
 
 
264
 
 
265
 
class ZipExporterTests(tests.TestCaseWithTransport):
266
 
 
267
 
    def test_per_file_timestamps(self):
268
 
        tree = self.make_branch_and_tree('.')
269
 
        self.build_tree_contents([('har', 'foo')])
270
 
        tree.add('har')
271
 
        # Earliest allowable date on FAT32 filesystems is 1980-01-01
272
 
        timestamp = 347151600
273
 
        tree.commit('setup', timestamp=timestamp)
274
 
        export.export(tree.basis_tree(), 'test.zip', format='zip',
275
 
            per_file_timestamps=True)
276
 
        zfile = zipfile.ZipFile('test.zip')
277
 
        info = zfile.getinfo("test/har")
278
 
        self.assertEquals(time.localtime(timestamp)[:6], info.date_time)
279
 
 
280
 
 
281
 
class RootNameTests(tests.TestCase):
282
 
 
283
 
    def test_root_name(self):
284
 
        self.assertEquals('mytest', get_root_name('../mytest.tar'))
285
 
        self.assertEquals('mytar', get_root_name('mytar.tar'))
286
 
        self.assertEquals('mytar', get_root_name('mytar.tar.bz2'))
287
 
        self.assertEquals('tar.tar.tar', get_root_name('tar.tar.tar.tgz'))
288
 
        self.assertEquals('bzr-0.0.5', get_root_name('bzr-0.0.5.tar.gz'))
289
 
        self.assertEquals('bzr-0.0.5', get_root_name('bzr-0.0.5.zip'))
290
 
        self.assertEquals('bzr-0.0.5', get_root_name('bzr-0.0.5'))
291
 
        self.assertEquals('mytar', get_root_name('a/long/path/mytar.tgz'))
292
 
        self.assertEquals('other',
293
 
            get_root_name('../parent/../dir/other.tbz2'))
294
 
        self.assertEquals('', get_root_name('-'))
 
64
        self.assertRaises(errors.BzrError, export.export, wt, 'target', format="dir")