~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

Add additional module for lsprof.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2004, 2005 by Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
import os
 
19
from cStringIO import StringIO
 
20
 
 
21
from bzrlib.errors import (NoSuchFile, FileExists,
 
22
                           TransportNotPossible, ConnectionError)
 
23
from bzrlib.tests import TestCase, TestCaseInTempDir
 
24
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
 
25
from bzrlib.transport import memory, urlescape
 
26
 
 
27
 
 
28
def _append(fn, txt):
 
29
    """Append the given text (file-like object) to the supplied filename."""
 
30
    f = open(fn, 'ab')
 
31
    f.write(txt)
 
32
    f.flush()
 
33
    f.close()
 
34
    del f
 
35
 
 
36
class TestTransport(TestCase):
 
37
    """Test the non transport-concrete class functionality."""
 
38
 
 
39
    def test_urlescape(self):
 
40
        self.assertEqual('%25', urlescape('%'))
 
41
 
 
42
 
 
43
class TestTransportMixIn(object):
 
44
    """Subclass this, and it will provide a series of tests for a Transport.
 
45
    It assumes that the Transport object is connected to the 
 
46
    current working directory.  So that whatever is done 
 
47
    through the transport, should show up in the working 
 
48
    directory, and vice-versa.
 
49
 
 
50
    This also tests to make sure that the functions work with both
 
51
    generators and lists (assuming iter(list) is effectively a generator)
 
52
    """
 
53
    readonly = False
 
54
    def get_transport(self):
 
55
        """Children should override this to return the Transport object.
 
56
        """
 
57
        raise NotImplementedError
 
58
 
 
59
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
60
        """Many transport functions can return generators this makes sure
 
61
        to wrap them in a list() call to make sure the whole generator
 
62
        is run, and that the proper exception is raised.
 
63
        """
 
64
        try:
 
65
            list(func(*args, **kwargs))
 
66
        except excClass:
 
67
            return
 
68
        else:
 
69
            if hasattr(excClass,'__name__'): excName = excClass.__name__
 
70
            else: excName = str(excClass)
 
71
            raise self.failureException, "%s not raised" % excName
 
72
 
 
73
    def test_has(self):
 
74
        t = self.get_transport()
 
75
 
 
76
        files = ['a', 'b', 'e', 'g', '%']
 
77
        self.build_tree(files)
 
78
        self.assertEqual(True, t.has('a'))
 
79
        self.assertEqual(False, t.has('c'))
 
80
        self.assertEqual(True, t.has(urlescape('%')))
 
81
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
 
82
                [True, True, False, False, True, False, True, False])
 
83
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
 
84
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlescape('%%')]))
 
85
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
 
86
                [True, True, False, False, True, False, True, False])
 
87
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
 
88
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
 
89
 
 
90
    def test_get(self):
 
91
        t = self.get_transport()
 
92
 
 
93
        files = ['a', 'b', 'e', 'g']
 
94
        self.build_tree(files)
 
95
        self.assertEqual(open('a', 'rb').read(), t.get('a').read())
 
96
        content_f = t.get_multi(files)
 
97
        for path,f in zip(files, content_f):
 
98
            self.assertEqual(open(path).read(), f.read())
 
99
 
 
100
        content_f = t.get_multi(iter(files))
 
101
        for path,f in zip(files, content_f):
 
102
            self.assertEqual(f.read(), open(path).read())
 
103
 
 
104
        self.assertRaises(NoSuchFile, t.get, 'c')
 
105
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
 
106
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
107
 
 
108
    def test_put(self):
 
109
        t = self.get_transport()
 
110
 
 
111
        if self.readonly:
 
112
            self.assertRaises(TransportNotPossible,
 
113
                    t.put, 'a', 'some text for a\n')
 
114
            open('a', 'wb').write('some text for a\n')
 
115
        else:
 
116
            t.put('a', 'some text for a\n')
 
117
        self.assert_(os.path.exists('a'))
 
118
        self.check_file_contents('a', 'some text for a\n')
 
119
        self.assertEqual(t.get('a').read(), 'some text for a\n')
 
120
        # Make sure 'has' is updated
 
121
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
 
122
                [True, False, False, False, False])
 
123
        if self.readonly:
 
124
            self.assertRaises(TransportNotPossible,
 
125
                    t.put_multi,
 
126
                    [('a', 'new\ncontents for\na\n'),
 
127
                        ('d', 'contents\nfor d\n')])
 
128
            open('a', 'wb').write('new\ncontents for\na\n')
 
129
            open('d', 'wb').write('contents\nfor d\n')
 
130
        else:
 
131
            # Put also replaces contents
 
132
            self.assertEqual(t.put_multi([('a', 'new\ncontents for\na\n'),
 
133
                                          ('d', 'contents\nfor d\n')]),
 
134
                             2)
 
135
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
 
136
                [True, False, False, True, False])
 
137
        self.check_file_contents('a', 'new\ncontents for\na\n')
 
138
        self.check_file_contents('d', 'contents\nfor d\n')
 
139
 
 
140
        if self.readonly:
 
141
            self.assertRaises(TransportNotPossible,
 
142
                t.put_multi, iter([('a', 'diff\ncontents for\na\n'),
 
143
                                  ('d', 'another contents\nfor d\n')]))
 
144
            open('a', 'wb').write('diff\ncontents for\na\n')
 
145
            open('d', 'wb').write('another contents\nfor d\n')
 
146
        else:
 
147
            self.assertEqual(
 
148
                t.put_multi(iter([('a', 'diff\ncontents for\na\n'),
 
149
                                  ('d', 'another contents\nfor d\n')]))
 
150
                             , 2)
 
151
        self.check_file_contents('a', 'diff\ncontents for\na\n')
 
152
        self.check_file_contents('d', 'another contents\nfor d\n')
 
153
 
 
154
        if self.readonly:
 
155
            self.assertRaises(TransportNotPossible,
 
156
                    t.put, 'path/doesnt/exist/c', 'contents')
 
157
        else:
 
158
            self.assertRaises(NoSuchFile,
 
159
                    t.put, 'path/doesnt/exist/c', 'contents')
 
160
 
 
161
    def test_put_file(self):
 
162
        t = self.get_transport()
 
163
 
 
164
        # Test that StringIO can be used as a file-like object with put
 
165
        f1 = StringIO('this is a string\nand some more stuff\n')
 
166
        if self.readonly:
 
167
            open('f1', 'wb').write(f1.read())
 
168
        else:
 
169
            t.put('f1', f1)
 
170
 
 
171
        del f1
 
172
 
 
173
        self.check_file_contents('f1', 
 
174
                'this is a string\nand some more stuff\n')
 
175
 
 
176
        f2 = StringIO('here is some text\nand a bit more\n')
 
177
        f3 = StringIO('some text for the\nthird file created\n')
 
178
 
 
179
        if self.readonly:
 
180
            open('f2', 'wb').write(f2.read())
 
181
            open('f3', 'wb').write(f3.read())
 
182
        else:
 
183
            t.put_multi([('f2', f2), ('f3', f3)])
 
184
 
 
185
        del f2, f3
 
186
 
 
187
        self.check_file_contents('f2', 'here is some text\nand a bit more\n')
 
188
        self.check_file_contents('f3', 'some text for the\nthird file created\n')
 
189
 
 
190
        # Test that an actual file object can be used with put
 
191
        f4 = open('f1', 'rb')
 
192
        if self.readonly:
 
193
            open('f4', 'wb').write(f4.read())
 
194
        else:
 
195
            t.put('f4', f4)
 
196
 
 
197
        del f4
 
198
 
 
199
        self.check_file_contents('f4', 
 
200
                'this is a string\nand some more stuff\n')
 
201
 
 
202
        f5 = open('f2', 'rb')
 
203
        f6 = open('f3', 'rb')
 
204
        if self.readonly:
 
205
            open('f5', 'wb').write(f5.read())
 
206
            open('f6', 'wb').write(f6.read())
 
207
        else:
 
208
            t.put_multi([('f5', f5), ('f6', f6)])
 
209
 
 
210
        del f5, f6
 
211
 
 
212
        self.check_file_contents('f5', 'here is some text\nand a bit more\n')
 
213
        self.check_file_contents('f6', 'some text for the\nthird file created\n')
 
214
 
 
215
    def test_mkdir(self):
 
216
        t = self.get_transport()
 
217
 
 
218
        # Test mkdir
 
219
        os.mkdir('dir_a')
 
220
        self.assertEqual(t.has('dir_a'), True)
 
221
        self.assertEqual(t.has('dir_b'), False)
 
222
 
 
223
        if self.readonly:
 
224
            self.assertRaises(TransportNotPossible,
 
225
                    t.mkdir, 'dir_b')
 
226
            os.mkdir('dir_b')
 
227
        else:
 
228
            t.mkdir('dir_b')
 
229
        self.assertEqual(t.has('dir_b'), True)
 
230
        self.assert_(os.path.isdir('dir_b'))
 
231
 
 
232
        if self.readonly:
 
233
            self.assertRaises(TransportNotPossible,
 
234
                    t.mkdir_multi, ['dir_c', 'dir_d'])
 
235
            os.mkdir('dir_c')
 
236
            os.mkdir('dir_d')
 
237
        else:
 
238
            t.mkdir_multi(['dir_c', 'dir_d'])
 
239
 
 
240
        if self.readonly:
 
241
            self.assertRaises(TransportNotPossible,
 
242
                    t.mkdir_multi, iter(['dir_e', 'dir_f']))
 
243
            os.mkdir('dir_e')
 
244
            os.mkdir('dir_f')
 
245
        else:
 
246
            t.mkdir_multi(iter(['dir_e', 'dir_f']))
 
247
        self.assertEqual(list(t.has_multi(
 
248
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
 
249
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
 
250
            [True, True, True, False,
 
251
             True, True, True, True])
 
252
        for d in ['dir_a', 'dir_b', 'dir_c', 'dir_d', 'dir_e', 'dir_f']:
 
253
            self.assert_(os.path.isdir(d))
 
254
 
 
255
        if not self.readonly:
 
256
            self.assertRaises(NoSuchFile, t.mkdir, 'path/doesnt/exist')
 
257
            self.assertRaises(FileExists, t.mkdir, 'dir_a') # Creating a directory again should fail
 
258
 
 
259
        # Make sure the transport recognizes when a
 
260
        # directory is created by other means
 
261
        # Caching Transports will fail, because dir_e was already seen not
 
262
        # to exist. So instead, we will search for a new directory
 
263
        #os.mkdir('dir_e')
 
264
        #if not self.readonly:
 
265
        #    self.assertRaises(FileExists, t.mkdir, 'dir_e')
 
266
 
 
267
        os.mkdir('dir_g')
 
268
        if not self.readonly:
 
269
            self.assertRaises(FileExists, t.mkdir, 'dir_g')
 
270
 
 
271
        # Test get/put in sub-directories
 
272
        if self.readonly:
 
273
            open('dir_a/a', 'wb').write('contents of dir_a/a')
 
274
            open('dir_b/b', 'wb').write('contents of dir_b/b')
 
275
        else:
 
276
            self.assertEqual(
 
277
                t.put_multi([('dir_a/a', 'contents of dir_a/a'),
 
278
                             ('dir_b/b', 'contents of dir_b/b')])
 
279
                          , 2)
 
280
        for f in ('dir_a/a', 'dir_b/b'):
 
281
            self.assertEqual(t.get(f).read(), open(f).read())
 
282
 
 
283
    def test_copy_to(self):
 
284
        import tempfile
 
285
        from bzrlib.transport.local import LocalTransport
 
286
 
 
287
        t = self.get_transport()
 
288
 
 
289
        files = ['a', 'b', 'c', 'd']
 
290
        self.build_tree(files)
 
291
 
 
292
        dtmp = tempfile.mkdtemp(dir=u'.', prefix='test-transport-')
 
293
        dtmp_base = os.path.basename(dtmp)
 
294
        local_t = LocalTransport(dtmp)
 
295
 
 
296
        t.copy_to(files, local_t)
 
297
        for f in files:
 
298
            self.assertEquals(open(f).read(),
 
299
                    open(os.path.join(dtmp_base, f)).read())
 
300
 
 
301
        # Test that copying into a missing directory raises
 
302
        # NoSuchFile
 
303
        os.mkdir('e')
 
304
        open('e/f', 'wb').write('contents of e')
 
305
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], local_t)
 
306
 
 
307
        os.mkdir(os.path.join(dtmp_base, 'e'))
 
308
        t.copy_to(['e/f'], local_t)
 
309
 
 
310
        del dtmp, dtmp_base, local_t
 
311
 
 
312
        dtmp = tempfile.mkdtemp(dir=u'.', prefix='test-transport-')
 
313
        dtmp_base = os.path.basename(dtmp)
 
314
        local_t = LocalTransport(dtmp)
 
315
 
 
316
        files = ['a', 'b', 'c', 'd']
 
317
        t.copy_to(iter(files), local_t)
 
318
        for f in files:
 
319
            self.assertEquals(open(f).read(),
 
320
                    open(os.path.join(dtmp_base, f)).read())
 
321
 
 
322
        del dtmp, dtmp_base, local_t
 
323
 
 
324
    def test_append(self):
 
325
        t = self.get_transport()
 
326
 
 
327
        if self.readonly:
 
328
            open('a', 'wb').write('diff\ncontents for\na\n')
 
329
            open('b', 'wb').write('contents\nfor b\n')
 
330
        else:
 
331
            t.put_multi([
 
332
                    ('a', 'diff\ncontents for\na\n'),
 
333
                    ('b', 'contents\nfor b\n')
 
334
                    ])
 
335
 
 
336
        if self.readonly:
 
337
            self.assertRaises(TransportNotPossible,
 
338
                    t.append, 'a', 'add\nsome\nmore\ncontents\n')
 
339
            _append('a', 'add\nsome\nmore\ncontents\n')
 
340
        else:
 
341
            t.append('a', 'add\nsome\nmore\ncontents\n')
 
342
 
 
343
        self.check_file_contents('a', 
 
344
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n')
 
345
 
 
346
        if self.readonly:
 
347
            self.assertRaises(TransportNotPossible,
 
348
                    t.append_multi,
 
349
                        [('a', 'and\nthen\nsome\nmore\n'),
 
350
                         ('b', 'some\nmore\nfor\nb\n')])
 
351
            _append('a', 'and\nthen\nsome\nmore\n')
 
352
            _append('b', 'some\nmore\nfor\nb\n')
 
353
        else:
 
354
            t.append_multi([('a', 'and\nthen\nsome\nmore\n'),
 
355
                    ('b', 'some\nmore\nfor\nb\n')])
 
356
        self.check_file_contents('a', 
 
357
            'diff\ncontents for\na\n'
 
358
            'add\nsome\nmore\ncontents\n'
 
359
            'and\nthen\nsome\nmore\n')
 
360
        self.check_file_contents('b', 
 
361
                'contents\nfor b\n'
 
362
                'some\nmore\nfor\nb\n')
 
363
 
 
364
        if self.readonly:
 
365
            _append('a', 'a little bit more\n')
 
366
            _append('b', 'from an iterator\n')
 
367
        else:
 
368
            t.append_multi(iter([('a', 'a little bit more\n'),
 
369
                    ('b', 'from an iterator\n')]))
 
370
        self.check_file_contents('a', 
 
371
            'diff\ncontents for\na\n'
 
372
            'add\nsome\nmore\ncontents\n'
 
373
            'and\nthen\nsome\nmore\n'
 
374
            'a little bit more\n')
 
375
        self.check_file_contents('b', 
 
376
                'contents\nfor b\n'
 
377
                'some\nmore\nfor\nb\n'
 
378
                'from an iterator\n')
 
379
 
 
380
        if self.readonly:
 
381
            _append('c', 'some text\nfor a missing file\n')
 
382
            _append('a', 'some text in a\n')
 
383
            _append('d', 'missing file r\n')
 
384
        else:
 
385
            t.append('c', 'some text\nfor a missing file\n')
 
386
            t.append_multi([('a', 'some text in a\n'),
 
387
                            ('d', 'missing file r\n')])
 
388
        self.check_file_contents('a', 
 
389
            'diff\ncontents for\na\n'
 
390
            'add\nsome\nmore\ncontents\n'
 
391
            'and\nthen\nsome\nmore\n'
 
392
            'a little bit more\n'
 
393
            'some text in a\n')
 
394
        self.check_file_contents('c', 'some text\nfor a missing file\n')
 
395
        self.check_file_contents('d', 'missing file r\n')
 
396
 
 
397
    def test_append_file(self):
 
398
        t = self.get_transport()
 
399
 
 
400
        contents = [
 
401
            ('f1', 'this is a string\nand some more stuff\n'),
 
402
            ('f2', 'here is some text\nand a bit more\n'),
 
403
            ('f3', 'some text for the\nthird file created\n'),
 
404
            ('f4', 'this is a string\nand some more stuff\n'),
 
405
            ('f5', 'here is some text\nand a bit more\n'),
 
406
            ('f6', 'some text for the\nthird file created\n')
 
407
        ]
 
408
        
 
409
        if self.readonly:
 
410
            for f, val in contents:
 
411
                open(f, 'wb').write(val)
 
412
        else:
 
413
            t.put_multi(contents)
 
414
 
 
415
        a1 = StringIO('appending to\none\n')
 
416
        if self.readonly:
 
417
            _append('f1', a1.read())
 
418
        else:
 
419
            t.append('f1', a1)
 
420
 
 
421
        del a1
 
422
 
 
423
        self.check_file_contents('f1', 
 
424
                'this is a string\nand some more stuff\n'
 
425
                'appending to\none\n')
 
426
 
 
427
        a2 = StringIO('adding more\ntext to two\n')
 
428
        a3 = StringIO('some garbage\nto put in three\n')
 
429
 
 
430
        if self.readonly:
 
431
            _append('f2', a2.read())
 
432
            _append('f3', a3.read())
 
433
        else:
 
434
            t.append_multi([('f2', a2), ('f3', a3)])
 
435
 
 
436
        del a2, a3
 
437
 
 
438
        self.check_file_contents('f2',
 
439
                'here is some text\nand a bit more\n'
 
440
                'adding more\ntext to two\n')
 
441
        self.check_file_contents('f3', 
 
442
                'some text for the\nthird file created\n'
 
443
                'some garbage\nto put in three\n')
 
444
 
 
445
        # Test that an actual file object can be used with put
 
446
        a4 = open('f1', 'rb')
 
447
        if self.readonly:
 
448
            _append('f4', a4.read())
 
449
        else:
 
450
            t.append('f4', a4)
 
451
 
 
452
        del a4
 
453
 
 
454
        self.check_file_contents('f4', 
 
455
                'this is a string\nand some more stuff\n'
 
456
                'this is a string\nand some more stuff\n'
 
457
                'appending to\none\n')
 
458
 
 
459
        a5 = open('f2', 'rb')
 
460
        a6 = open('f3', 'rb')
 
461
        if self.readonly:
 
462
            _append('f5', a5.read())
 
463
            _append('f6', a6.read())
 
464
        else:
 
465
            t.append_multi([('f5', a5), ('f6', a6)])
 
466
 
 
467
        del a5, a6
 
468
 
 
469
        self.check_file_contents('f5',
 
470
                'here is some text\nand a bit more\n'
 
471
                'here is some text\nand a bit more\n'
 
472
                'adding more\ntext to two\n')
 
473
        self.check_file_contents('f6',
 
474
                'some text for the\nthird file created\n'
 
475
                'some text for the\nthird file created\n'
 
476
                'some garbage\nto put in three\n')
 
477
 
 
478
        a5 = open('f2', 'rb')
 
479
        a6 = open('f2', 'rb')
 
480
        a7 = open('f3', 'rb')
 
481
        if self.readonly:
 
482
            _append('c', a5.read())
 
483
            _append('a', a6.read())
 
484
            _append('d', a7.read())
 
485
        else:
 
486
            t.append('c', a5)
 
487
            t.append_multi([('a', a6), ('d', a7)])
 
488
        del a5, a6, a7
 
489
        self.check_file_contents('c', open('f2', 'rb').read())
 
490
        self.check_file_contents('d', open('f3', 'rb').read())
 
491
 
 
492
 
 
493
    def test_delete(self):
 
494
        # TODO: Test Transport.delete
 
495
        t = self.get_transport()
 
496
 
 
497
        # Not much to do with a readonly transport
 
498
        if self.readonly:
 
499
            return
 
500
 
 
501
        open('a', 'wb').write('a little bit of text\n')
 
502
        self.failUnless(t.has('a'))
 
503
        self.failUnlessExists('a')
 
504
        t.delete('a')
 
505
        self.failIf(os.path.lexists('a'))
 
506
 
 
507
        self.assertRaises(NoSuchFile, t.delete, 'a')
 
508
 
 
509
        open('a', 'wb').write('a text\n')
 
510
        open('b', 'wb').write('b text\n')
 
511
        open('c', 'wb').write('c text\n')
 
512
        self.assertEqual([True, True, True],
 
513
                list(t.has_multi(['a', 'b', 'c'])))
 
514
        t.delete_multi(['a', 'c'])
 
515
        self.assertEqual([False, True, False],
 
516
                list(t.has_multi(['a', 'b', 'c'])))
 
517
        self.failIf(os.path.lexists('a'))
 
518
        self.failUnlessExists('b')
 
519
        self.failIf(os.path.lexists('c'))
 
520
 
 
521
        self.assertRaises(NoSuchFile,
 
522
                t.delete_multi, ['a', 'b', 'c'])
 
523
 
 
524
        self.assertRaises(NoSuchFile,
 
525
                t.delete_multi, iter(['a', 'b', 'c']))
 
526
 
 
527
        open('a', 'wb').write('another a text\n')
 
528
        open('c', 'wb').write('another c text\n')
 
529
        t.delete_multi(iter(['a', 'b', 'c']))
 
530
 
 
531
        # We should have deleted everything
 
532
        # SftpServer creates control files in the
 
533
        # working directory, so we can just do a
 
534
        # plain "listdir".
 
535
        # self.assertEqual([], os.listdir('.'))
 
536
 
 
537
    def test_move(self):
 
538
        t = self.get_transport()
 
539
 
 
540
        if self.readonly:
 
541
            return
 
542
 
 
543
        # TODO: I would like to use os.listdir() to
 
544
        # make sure there are no extra files, but SftpServer
 
545
        # creates control files in the working directory
 
546
        # perhaps all of this could be done in a subdirectory
 
547
 
 
548
        open('a', 'wb').write('a first file\n')
 
549
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
550
 
 
551
        t.move('a', 'b')
 
552
        self.failUnlessExists('b')
 
553
        self.failIf(os.path.lexists('a'))
 
554
 
 
555
        self.check_file_contents('b', 'a first file\n')
 
556
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
557
 
 
558
        # Overwrite a file
 
559
        open('c', 'wb').write('c this file\n')
 
560
        t.move('c', 'b')
 
561
        self.failIf(os.path.lexists('c'))
 
562
        self.check_file_contents('b', 'c this file\n')
 
563
 
 
564
        # TODO: Try to write a test for atomicity
 
565
        # TODO: Test moving into a non-existant subdirectory
 
566
        # TODO: Test Transport.move_multi
 
567
 
 
568
    def test_copy(self):
 
569
        t = self.get_transport()
 
570
 
 
571
        if self.readonly:
 
572
            return
 
573
 
 
574
        open('a', 'wb').write('a file\n')
 
575
        t.copy('a', 'b')
 
576
        self.check_file_contents('b', 'a file\n')
 
577
 
 
578
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
 
579
        os.mkdir('c')
 
580
        # What should the assert be if you try to copy a
 
581
        # file over a directory?
 
582
        #self.assertRaises(Something, t.copy, 'a', 'c')
 
583
        open('d', 'wb').write('text in d\n')
 
584
        t.copy('d', 'b')
 
585
        self.check_file_contents('b', 'text in d\n')
 
586
 
 
587
        # TODO: test copy_multi
 
588
 
 
589
    def test_connection_error(self):
 
590
        """ConnectionError is raised when connection is impossible"""
 
591
        if not hasattr(self, "get_bogus_transport"):
 
592
            return
 
593
        t = self.get_bogus_transport()
 
594
        try:
 
595
            t.get('.bzr/branch')
 
596
        except (ConnectionError, NoSuchFile), e:
 
597
            pass
 
598
        except (Exception), e:
 
599
            self.failIf(True, 'Wrong exception thrown: %s' % e)
 
600
        else:
 
601
            self.failIf(True, 'Did not get the expected exception.')
 
602
 
 
603
    def test_stat(self):
 
604
        # TODO: Test stat, just try once, and if it throws, stop testing
 
605
        from stat import S_ISDIR, S_ISREG
 
606
 
 
607
        t = self.get_transport()
 
608
 
 
609
        try:
 
610
            st = t.stat('.')
 
611
        except TransportNotPossible, e:
 
612
            # This transport cannot stat
 
613
            return
 
614
 
 
615
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
 
616
        self.build_tree(paths)
 
617
 
 
618
        local_stats = []
 
619
 
 
620
        for p in paths:
 
621
            st = t.stat(p)
 
622
            local_st = os.stat(p)
 
623
            if p.endswith('/'):
 
624
                self.failUnless(S_ISDIR(st.st_mode))
 
625
            else:
 
626
                self.failUnless(S_ISREG(st.st_mode))
 
627
            self.assertEqual(local_st.st_size, st.st_size)
 
628
            self.assertEqual(local_st.st_mode, st.st_mode)
 
629
            local_stats.append(local_st)
 
630
 
 
631
        remote_stats = list(t.stat_multi(paths))
 
632
        remote_iter_stats = list(t.stat_multi(iter(paths)))
 
633
 
 
634
        for local, remote, remote_iter in \
 
635
            zip(local_stats, remote_stats, remote_iter_stats):
 
636
            self.assertEqual(local.st_mode, remote.st_mode)
 
637
            self.assertEqual(local.st_mode, remote_iter.st_mode)
 
638
 
 
639
            self.assertEqual(local.st_size, remote.st_size)
 
640
            self.assertEqual(local.st_size, remote_iter.st_size)
 
641
            # Should we test UID/GID?
 
642
 
 
643
        self.assertRaises(NoSuchFile, t.stat, 'q')
 
644
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
 
645
 
 
646
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
 
647
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
 
648
 
 
649
    def test_list_dir(self):
 
650
        # TODO: Test list_dir, just try once, and if it throws, stop testing
 
651
        t = self.get_transport()
 
652
        
 
653
        if not t.listable():
 
654
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
 
655
            return
 
656
 
 
657
        def sorted_list(d):
 
658
            l = list(t.list_dir(d))
 
659
            l.sort()
 
660
            return l
 
661
 
 
662
        # SftpServer creates control files in the working directory
 
663
        # so lets move down a directory to be safe
 
664
        os.mkdir('wd')
 
665
        os.chdir('wd')
 
666
        t = t.clone('wd')
 
667
 
 
668
        self.assertEqual([], sorted_list(u'.'))
 
669
        self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e'])
 
670
 
 
671
        self.assertEqual([u'a', u'b', u'c'], sorted_list(u'.'))
 
672
        self.assertEqual([u'd', u'e'], sorted_list(u'c'))
 
673
 
 
674
        os.remove('c/d')
 
675
        os.remove('b')
 
676
        self.assertEqual([u'a', u'c'], sorted_list('.'))
 
677
        self.assertEqual([u'e'], sorted_list(u'c'))
 
678
 
 
679
        self.assertListRaises(NoSuchFile, t.list_dir, 'q')
 
680
        self.assertListRaises(NoSuchFile, t.list_dir, 'c/f')
 
681
        self.assertListRaises(NoSuchFile, t.list_dir, 'a')
 
682
 
 
683
    def test_clone(self):
 
684
        # TODO: Test that clone moves up and down the filesystem
 
685
        t1 = self.get_transport()
 
686
 
 
687
        self.build_tree(['a', 'b/', 'b/c'])
 
688
 
 
689
        self.failUnless(t1.has('a'))
 
690
        self.failUnless(t1.has('b/c'))
 
691
        self.failIf(t1.has('c'))
 
692
 
 
693
        t2 = t1.clone('b')
 
694
        self.failUnless(t2.has('c'))
 
695
        self.failIf(t2.has('a'))
 
696
 
 
697
        t3 = t2.clone('..')
 
698
        self.failUnless(t3.has('a'))
 
699
        self.failIf(t3.has('c'))
 
700
 
 
701
        self.failIf(t1.has('b/d'))
 
702
        self.failIf(t2.has('d'))
 
703
        self.failIf(t3.has('b/d'))
 
704
 
 
705
        if self.readonly:
 
706
            open('b/d', 'wb').write('newfile\n')
 
707
        else:
 
708
            t2.put('d', 'newfile\n')
 
709
 
 
710
        self.failUnless(t1.has('b/d'))
 
711
        self.failUnless(t2.has('d'))
 
712
        self.failUnless(t3.has('b/d'))
 
713
 
 
714
        
 
715
class LocalTransportTest(TestCaseInTempDir, TestTransportMixIn):
 
716
    def get_transport(self):
 
717
        from bzrlib.transport.local import LocalTransport
 
718
        return LocalTransport(u'.')
 
719
 
 
720
 
 
721
class HttpTransportTest(TestCaseWithWebserver, TestTransportMixIn):
 
722
 
 
723
    readonly = True
 
724
 
 
725
    def get_transport(self):
 
726
        from bzrlib.transport.http import HttpTransport
 
727
        url = self.get_remote_url(u'.')
 
728
        return HttpTransport(url)
 
729
 
 
730
    def get_bogus_transport(self):
 
731
        from bzrlib.transport.http import HttpTransport
 
732
        return HttpTransport('http://jasldkjsalkdjalksjdkljasd')
 
733
 
 
734
 
 
735
class TestMemoryTransport(TestCase):
 
736
 
 
737
    def test_get_transport(self):
 
738
        memory.MemoryTransport()
 
739
 
 
740
    def test_clone(self):
 
741
        transport = memory.MemoryTransport()
 
742
        self.failUnless(transport.clone() is transport)
 
743
 
 
744
    def test_abspath(self):
 
745
        transport = memory.MemoryTransport()
 
746
        self.assertEqual("in-memory:relpath", transport.abspath('relpath'))
 
747
 
 
748
    def test_relpath(self):
 
749
        transport = memory.MemoryTransport()
 
750
 
 
751
    def test_append_and_get(self):
 
752
        transport = memory.MemoryTransport()
 
753
        transport.append('path', StringIO('content'))
 
754
        self.assertEqual(transport.get('path').read(), 'content')
 
755
        transport.append('path', StringIO('content'))
 
756
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
757
 
 
758
    def test_put_and_get(self):
 
759
        transport = memory.MemoryTransport()
 
760
        transport.put('path', StringIO('content'))
 
761
        self.assertEqual(transport.get('path').read(), 'content')
 
762
        transport.put('path', StringIO('content'))
 
763
        self.assertEqual(transport.get('path').read(), 'content')
 
764
 
 
765
    def test_append_without_dir_fails(self):
 
766
        transport = memory.MemoryTransport()
 
767
        self.assertRaises(NoSuchFile,
 
768
                          transport.append, 'dir/path', StringIO('content'))
 
769
 
 
770
    def test_put_without_dir_fails(self):
 
771
        transport = memory.MemoryTransport()
 
772
        self.assertRaises(NoSuchFile,
 
773
                          transport.put, 'dir/path', StringIO('content'))
 
774
 
 
775
    def test_get_missing(self):
 
776
        transport = memory.MemoryTransport()
 
777
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
778
 
 
779
    def test_has_missing(self):
 
780
        transport = memory.MemoryTransport()
 
781
        self.assertEquals(False, transport.has('foo'))
 
782
 
 
783
    def test_has_present(self):
 
784
        transport = memory.MemoryTransport()
 
785
        transport.append('foo', StringIO('content'))
 
786
        self.assertEquals(True, transport.has('foo'))
 
787
 
 
788
    def test_mkdir(self):
 
789
        transport = memory.MemoryTransport()
 
790
        transport.mkdir('dir')
 
791
        transport.append('dir/path', StringIO('content'))
 
792
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
793
 
 
794
    def test_mkdir_missing_parent(self):
 
795
        transport = memory.MemoryTransport()
 
796
        self.assertRaises(NoSuchFile,
 
797
                          transport.mkdir, 'dir/dir')
 
798
 
 
799
    def test_mkdir_twice(self):
 
800
        transport = memory.MemoryTransport()
 
801
        transport.mkdir('dir')
 
802
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
803
 
 
804
    def test_parameters(self):
 
805
        transport = memory.MemoryTransport()
 
806
        self.assertEqual(True, transport.listable())
 
807
        self.assertEqual(False, transport.should_cache())
 
808
 
 
809
    def test_iter_files_recursive(self):
 
810
        transport = memory.MemoryTransport()
 
811
        transport.mkdir('dir')
 
812
        transport.put('dir/foo', StringIO('content'))
 
813
        transport.put('dir/bar', StringIO('content'))
 
814
        transport.put('bar', StringIO('content'))
 
815
        paths = set(transport.iter_files_recursive())
 
816
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
817
 
 
818
    def test_stat(self):
 
819
        transport = memory.MemoryTransport()
 
820
        transport.put('foo', StringIO('content'))
 
821
        transport.put('bar', StringIO('phowar'))
 
822
        self.assertEqual(7, transport.stat('foo').st_size)
 
823
        self.assertEqual(6, transport.stat('bar').st_size)
 
824