~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

Merged mailine

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
 
18
18
import os
 
19
import sys
 
20
import stat
19
21
from cStringIO import StringIO
20
22
 
21
23
from bzrlib.errors import (NoSuchFile, FileExists,
22
24
                           TransportNotPossible, ConnectionError)
23
 
from bzrlib.tests import TestCase, TestCaseInTempDir
24
 
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
25
 
from bzrlib.transport import memory, urlescape
26
 
 
27
 
 
28
 
def _append(fn, txt):
29
 
    """Append the given text (file-like object) to the supplied filename."""
30
 
    f = open(fn, 'ab')
31
 
    f.write(txt)
32
 
    f.flush()
33
 
    f.close()
34
 
    del f
 
25
from bzrlib.tests import TestCase
 
26
from bzrlib.transport import (_get_protocol_handlers,
 
27
                              _get_transport_modules,
 
28
                              register_lazy_transport,
 
29
                              _set_protocol_handlers,
 
30
                              urlescape,
 
31
                              )
 
32
 
35
33
 
36
34
class TestTransport(TestCase):
37
35
    """Test the non transport-concrete class functionality."""
39
37
    def test_urlescape(self):
40
38
        self.assertEqual('%25', urlescape('%'))
41
39
 
42
 
 
43
 
class TestTransportMixIn(object):
44
 
    """Subclass this, and it will provide a series of tests for a Transport.
45
 
    It assumes that the Transport object is connected to the 
46
 
    current working directory.  So that whatever is done 
47
 
    through the transport, should show up in the working 
48
 
    directory, and vice-versa.
49
 
 
50
 
    This also tests to make sure that the functions work with both
51
 
    generators and lists (assuming iter(list) is effectively a generator)
52
 
    """
53
 
    readonly = False
54
 
    def get_transport(self):
55
 
        """Children should override this to return the Transport object.
56
 
        """
57
 
        raise NotImplementedError
58
 
 
59
 
    def assertListRaises(self, excClass, func, *args, **kwargs):
60
 
        """Many transport functions can return generators this makes sure
61
 
        to wrap them in a list() call to make sure the whole generator
62
 
        is run, and that the proper exception is raised.
63
 
        """
64
 
        try:
65
 
            list(func(*args, **kwargs))
66
 
        except excClass:
67
 
            return
68
 
        else:
69
 
            if hasattr(excClass,'__name__'): excName = excClass.__name__
70
 
            else: excName = str(excClass)
71
 
            raise self.failureException, "%s not raised" % excName
72
 
 
73
 
    def test_has(self):
74
 
        t = self.get_transport()
75
 
 
76
 
        files = ['a', 'b', 'e', 'g', '%']
77
 
        self.build_tree(files)
78
 
        self.assertEqual(True, t.has('a'))
79
 
        self.assertEqual(False, t.has('c'))
80
 
        self.assertEqual(True, t.has(urlescape('%')))
81
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
82
 
                [True, True, False, False, True, False, True, False])
83
 
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
84
 
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlescape('%%')]))
85
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
86
 
                [True, True, False, False, True, False, True, False])
87
 
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
88
 
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
89
 
 
90
 
    def test_get(self):
91
 
        t = self.get_transport()
92
 
 
93
 
        files = ['a', 'b', 'e', 'g']
94
 
        self.build_tree(files)
95
 
        self.assertEqual(open('a', 'rb').read(), t.get('a').read())
96
 
        content_f = t.get_multi(files)
97
 
        for path,f in zip(files, content_f):
98
 
            self.assertEqual(open(path).read(), f.read())
99
 
 
100
 
        content_f = t.get_multi(iter(files))
101
 
        for path,f in zip(files, content_f):
102
 
            self.assertEqual(f.read(), open(path).read())
103
 
 
104
 
        self.assertRaises(NoSuchFile, t.get, 'c')
105
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
106
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
107
 
 
108
 
    def test_put(self):
109
 
        t = self.get_transport()
110
 
 
111
 
        if self.readonly:
112
 
            self.assertRaises(TransportNotPossible,
113
 
                    t.put, 'a', 'some text for a\n')
114
 
            open('a', 'wb').write('some text for a\n')
115
 
        else:
116
 
            t.put('a', 'some text for a\n')
117
 
        self.assert_(os.path.exists('a'))
118
 
        self.check_file_contents('a', 'some text for a\n')
119
 
        self.assertEqual(t.get('a').read(), 'some text for a\n')
120
 
        # Make sure 'has' is updated
121
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
122
 
                [True, False, False, False, False])
123
 
        if self.readonly:
124
 
            self.assertRaises(TransportNotPossible,
125
 
                    t.put_multi,
126
 
                    [('a', 'new\ncontents for\na\n'),
127
 
                        ('d', 'contents\nfor d\n')])
128
 
            open('a', 'wb').write('new\ncontents for\na\n')
129
 
            open('d', 'wb').write('contents\nfor d\n')
130
 
        else:
131
 
            # Put also replaces contents
132
 
            self.assertEqual(t.put_multi([('a', 'new\ncontents for\na\n'),
133
 
                                          ('d', 'contents\nfor d\n')]),
134
 
                             2)
135
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
136
 
                [True, False, False, True, False])
137
 
        self.check_file_contents('a', 'new\ncontents for\na\n')
138
 
        self.check_file_contents('d', 'contents\nfor d\n')
139
 
 
140
 
        if self.readonly:
141
 
            self.assertRaises(TransportNotPossible,
142
 
                t.put_multi, iter([('a', 'diff\ncontents for\na\n'),
143
 
                                  ('d', 'another contents\nfor d\n')]))
144
 
            open('a', 'wb').write('diff\ncontents for\na\n')
145
 
            open('d', 'wb').write('another contents\nfor d\n')
146
 
        else:
147
 
            self.assertEqual(
148
 
                t.put_multi(iter([('a', 'diff\ncontents for\na\n'),
149
 
                                  ('d', 'another contents\nfor d\n')]))
150
 
                             , 2)
151
 
        self.check_file_contents('a', 'diff\ncontents for\na\n')
152
 
        self.check_file_contents('d', 'another contents\nfor d\n')
153
 
 
154
 
        if self.readonly:
155
 
            self.assertRaises(TransportNotPossible,
156
 
                    t.put, 'path/doesnt/exist/c', 'contents')
157
 
        else:
158
 
            self.assertRaises(NoSuchFile,
159
 
                    t.put, 'path/doesnt/exist/c', 'contents')
160
 
 
161
 
    def test_put_file(self):
162
 
        t = self.get_transport()
163
 
 
164
 
        # Test that StringIO can be used as a file-like object with put
165
 
        f1 = StringIO('this is a string\nand some more stuff\n')
166
 
        if self.readonly:
167
 
            open('f1', 'wb').write(f1.read())
168
 
        else:
169
 
            t.put('f1', f1)
170
 
 
171
 
        del f1
172
 
 
173
 
        self.check_file_contents('f1', 
174
 
                'this is a string\nand some more stuff\n')
175
 
 
176
 
        f2 = StringIO('here is some text\nand a bit more\n')
177
 
        f3 = StringIO('some text for the\nthird file created\n')
178
 
 
179
 
        if self.readonly:
180
 
            open('f2', 'wb').write(f2.read())
181
 
            open('f3', 'wb').write(f3.read())
182
 
        else:
183
 
            t.put_multi([('f2', f2), ('f3', f3)])
184
 
 
185
 
        del f2, f3
186
 
 
187
 
        self.check_file_contents('f2', 'here is some text\nand a bit more\n')
188
 
        self.check_file_contents('f3', 'some text for the\nthird file created\n')
189
 
 
190
 
        # Test that an actual file object can be used with put
191
 
        f4 = open('f1', 'rb')
192
 
        if self.readonly:
193
 
            open('f4', 'wb').write(f4.read())
194
 
        else:
195
 
            t.put('f4', f4)
196
 
 
197
 
        del f4
198
 
 
199
 
        self.check_file_contents('f4', 
200
 
                'this is a string\nand some more stuff\n')
201
 
 
202
 
        f5 = open('f2', 'rb')
203
 
        f6 = open('f3', 'rb')
204
 
        if self.readonly:
205
 
            open('f5', 'wb').write(f5.read())
206
 
            open('f6', 'wb').write(f6.read())
207
 
        else:
208
 
            t.put_multi([('f5', f5), ('f6', f6)])
209
 
 
210
 
        del f5, f6
211
 
 
212
 
        self.check_file_contents('f5', 'here is some text\nand a bit more\n')
213
 
        self.check_file_contents('f6', 'some text for the\nthird file created\n')
214
 
 
215
 
    def test_mkdir(self):
216
 
        t = self.get_transport()
217
 
 
218
 
        # Test mkdir
219
 
        os.mkdir('dir_a')
220
 
        self.assertEqual(t.has('dir_a'), True)
221
 
        self.assertEqual(t.has('dir_b'), False)
222
 
 
223
 
        if self.readonly:
224
 
            self.assertRaises(TransportNotPossible,
225
 
                    t.mkdir, 'dir_b')
226
 
            os.mkdir('dir_b')
227
 
        else:
228
 
            t.mkdir('dir_b')
229
 
        self.assertEqual(t.has('dir_b'), True)
230
 
        self.assert_(os.path.isdir('dir_b'))
231
 
 
232
 
        if self.readonly:
233
 
            self.assertRaises(TransportNotPossible,
234
 
                    t.mkdir_multi, ['dir_c', 'dir_d'])
235
 
            os.mkdir('dir_c')
236
 
            os.mkdir('dir_d')
237
 
        else:
238
 
            t.mkdir_multi(['dir_c', 'dir_d'])
239
 
 
240
 
        if self.readonly:
241
 
            self.assertRaises(TransportNotPossible,
242
 
                    t.mkdir_multi, iter(['dir_e', 'dir_f']))
243
 
            os.mkdir('dir_e')
244
 
            os.mkdir('dir_f')
245
 
        else:
246
 
            t.mkdir_multi(iter(['dir_e', 'dir_f']))
247
 
        self.assertEqual(list(t.has_multi(
248
 
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
249
 
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
250
 
            [True, True, True, False,
251
 
             True, True, True, True])
252
 
        for d in ['dir_a', 'dir_b', 'dir_c', 'dir_d', 'dir_e', 'dir_f']:
253
 
            self.assert_(os.path.isdir(d))
254
 
 
255
 
        if not self.readonly:
256
 
            self.assertRaises(NoSuchFile, t.mkdir, 'path/doesnt/exist')
257
 
            self.assertRaises(FileExists, t.mkdir, 'dir_a') # Creating a directory again should fail
258
 
 
259
 
        # Make sure the transport recognizes when a
260
 
        # directory is created by other means
261
 
        # Caching Transports will fail, because dir_e was already seen not
262
 
        # to exist. So instead, we will search for a new directory
263
 
        #os.mkdir('dir_e')
264
 
        #if not self.readonly:
265
 
        #    self.assertRaises(FileExists, t.mkdir, 'dir_e')
266
 
 
267
 
        os.mkdir('dir_g')
268
 
        if not self.readonly:
269
 
            self.assertRaises(FileExists, t.mkdir, 'dir_g')
270
 
 
271
 
        # Test get/put in sub-directories
272
 
        if self.readonly:
273
 
            open('dir_a/a', 'wb').write('contents of dir_a/a')
274
 
            open('dir_b/b', 'wb').write('contents of dir_b/b')
275
 
        else:
276
 
            self.assertEqual(
277
 
                t.put_multi([('dir_a/a', 'contents of dir_a/a'),
278
 
                             ('dir_b/b', 'contents of dir_b/b')])
279
 
                          , 2)
280
 
        for f in ('dir_a/a', 'dir_b/b'):
281
 
            self.assertEqual(t.get(f).read(), open(f).read())
282
 
 
283
 
    def test_copy_to(self):
284
 
        import tempfile
285
 
        from bzrlib.transport.local import LocalTransport
286
 
 
287
 
        t = self.get_transport()
288
 
 
289
 
        files = ['a', 'b', 'c', 'd']
290
 
        self.build_tree(files)
291
 
 
292
 
        dtmp = tempfile.mkdtemp(dir=u'.', prefix='test-transport-')
293
 
        dtmp_base = os.path.basename(dtmp)
294
 
        local_t = LocalTransport(dtmp)
295
 
 
296
 
        t.copy_to(files, local_t)
297
 
        for f in files:
298
 
            self.assertEquals(open(f).read(),
299
 
                    open(os.path.join(dtmp_base, f)).read())
300
 
 
301
 
        # Test that copying into a missing directory raises
302
 
        # NoSuchFile
303
 
        os.mkdir('e')
304
 
        open('e/f', 'wb').write('contents of e')
305
 
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], local_t)
306
 
 
307
 
        os.mkdir(os.path.join(dtmp_base, 'e'))
308
 
        t.copy_to(['e/f'], local_t)
309
 
 
310
 
        del dtmp, dtmp_base, local_t
311
 
 
312
 
        dtmp = tempfile.mkdtemp(dir=u'.', prefix='test-transport-')
313
 
        dtmp_base = os.path.basename(dtmp)
314
 
        local_t = LocalTransport(dtmp)
315
 
 
316
 
        files = ['a', 'b', 'c', 'd']
317
 
        t.copy_to(iter(files), local_t)
318
 
        for f in files:
319
 
            self.assertEquals(open(f).read(),
320
 
                    open(os.path.join(dtmp_base, f)).read())
321
 
 
322
 
        del dtmp, dtmp_base, local_t
323
 
 
324
 
    def test_append(self):
325
 
        t = self.get_transport()
326
 
 
327
 
        if self.readonly:
328
 
            open('a', 'wb').write('diff\ncontents for\na\n')
329
 
            open('b', 'wb').write('contents\nfor b\n')
330
 
        else:
331
 
            t.put_multi([
332
 
                    ('a', 'diff\ncontents for\na\n'),
333
 
                    ('b', 'contents\nfor b\n')
334
 
                    ])
335
 
 
336
 
        if self.readonly:
337
 
            self.assertRaises(TransportNotPossible,
338
 
                    t.append, 'a', 'add\nsome\nmore\ncontents\n')
339
 
            _append('a', 'add\nsome\nmore\ncontents\n')
340
 
        else:
341
 
            t.append('a', 'add\nsome\nmore\ncontents\n')
342
 
 
343
 
        self.check_file_contents('a', 
344
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n')
345
 
 
346
 
        if self.readonly:
347
 
            self.assertRaises(TransportNotPossible,
348
 
                    t.append_multi,
349
 
                        [('a', 'and\nthen\nsome\nmore\n'),
350
 
                         ('b', 'some\nmore\nfor\nb\n')])
351
 
            _append('a', 'and\nthen\nsome\nmore\n')
352
 
            _append('b', 'some\nmore\nfor\nb\n')
353
 
        else:
354
 
            t.append_multi([('a', 'and\nthen\nsome\nmore\n'),
355
 
                    ('b', 'some\nmore\nfor\nb\n')])
356
 
        self.check_file_contents('a', 
357
 
            'diff\ncontents for\na\n'
358
 
            'add\nsome\nmore\ncontents\n'
359
 
            'and\nthen\nsome\nmore\n')
360
 
        self.check_file_contents('b', 
361
 
                'contents\nfor b\n'
362
 
                'some\nmore\nfor\nb\n')
363
 
 
364
 
        if self.readonly:
365
 
            _append('a', 'a little bit more\n')
366
 
            _append('b', 'from an iterator\n')
367
 
        else:
368
 
            t.append_multi(iter([('a', 'a little bit more\n'),
369
 
                    ('b', 'from an iterator\n')]))
370
 
        self.check_file_contents('a', 
371
 
            'diff\ncontents for\na\n'
372
 
            'add\nsome\nmore\ncontents\n'
373
 
            'and\nthen\nsome\nmore\n'
374
 
            'a little bit more\n')
375
 
        self.check_file_contents('b', 
376
 
                'contents\nfor b\n'
377
 
                'some\nmore\nfor\nb\n'
378
 
                'from an iterator\n')
379
 
 
380
 
        if self.readonly:
381
 
            _append('c', 'some text\nfor a missing file\n')
382
 
            _append('a', 'some text in a\n')
383
 
            _append('d', 'missing file r\n')
384
 
        else:
385
 
            t.append('c', 'some text\nfor a missing file\n')
386
 
            t.append_multi([('a', 'some text in a\n'),
387
 
                            ('d', 'missing file r\n')])
388
 
        self.check_file_contents('a', 
389
 
            'diff\ncontents for\na\n'
390
 
            'add\nsome\nmore\ncontents\n'
391
 
            'and\nthen\nsome\nmore\n'
392
 
            'a little bit more\n'
393
 
            'some text in a\n')
394
 
        self.check_file_contents('c', 'some text\nfor a missing file\n')
395
 
        self.check_file_contents('d', 'missing file r\n')
396
 
 
397
 
    def test_append_file(self):
398
 
        t = self.get_transport()
399
 
 
400
 
        contents = [
401
 
            ('f1', 'this is a string\nand some more stuff\n'),
402
 
            ('f2', 'here is some text\nand a bit more\n'),
403
 
            ('f3', 'some text for the\nthird file created\n'),
404
 
            ('f4', 'this is a string\nand some more stuff\n'),
405
 
            ('f5', 'here is some text\nand a bit more\n'),
406
 
            ('f6', 'some text for the\nthird file created\n')
407
 
        ]
408
 
        
409
 
        if self.readonly:
410
 
            for f, val in contents:
411
 
                open(f, 'wb').write(val)
412
 
        else:
413
 
            t.put_multi(contents)
414
 
 
415
 
        a1 = StringIO('appending to\none\n')
416
 
        if self.readonly:
417
 
            _append('f1', a1.read())
418
 
        else:
419
 
            t.append('f1', a1)
420
 
 
421
 
        del a1
422
 
 
423
 
        self.check_file_contents('f1', 
424
 
                'this is a string\nand some more stuff\n'
425
 
                'appending to\none\n')
426
 
 
427
 
        a2 = StringIO('adding more\ntext to two\n')
428
 
        a3 = StringIO('some garbage\nto put in three\n')
429
 
 
430
 
        if self.readonly:
431
 
            _append('f2', a2.read())
432
 
            _append('f3', a3.read())
433
 
        else:
434
 
            t.append_multi([('f2', a2), ('f3', a3)])
435
 
 
436
 
        del a2, a3
437
 
 
438
 
        self.check_file_contents('f2',
439
 
                'here is some text\nand a bit more\n'
440
 
                'adding more\ntext to two\n')
441
 
        self.check_file_contents('f3', 
442
 
                'some text for the\nthird file created\n'
443
 
                'some garbage\nto put in three\n')
444
 
 
445
 
        # Test that an actual file object can be used with put
446
 
        a4 = open('f1', 'rb')
447
 
        if self.readonly:
448
 
            _append('f4', a4.read())
449
 
        else:
450
 
            t.append('f4', a4)
451
 
 
452
 
        del a4
453
 
 
454
 
        self.check_file_contents('f4', 
455
 
                'this is a string\nand some more stuff\n'
456
 
                'this is a string\nand some more stuff\n'
457
 
                'appending to\none\n')
458
 
 
459
 
        a5 = open('f2', 'rb')
460
 
        a6 = open('f3', 'rb')
461
 
        if self.readonly:
462
 
            _append('f5', a5.read())
463
 
            _append('f6', a6.read())
464
 
        else:
465
 
            t.append_multi([('f5', a5), ('f6', a6)])
466
 
 
467
 
        del a5, a6
468
 
 
469
 
        self.check_file_contents('f5',
470
 
                'here is some text\nand a bit more\n'
471
 
                'here is some text\nand a bit more\n'
472
 
                'adding more\ntext to two\n')
473
 
        self.check_file_contents('f6',
474
 
                'some text for the\nthird file created\n'
475
 
                'some text for the\nthird file created\n'
476
 
                'some garbage\nto put in three\n')
477
 
 
478
 
        a5 = open('f2', 'rb')
479
 
        a6 = open('f2', 'rb')
480
 
        a7 = open('f3', 'rb')
481
 
        if self.readonly:
482
 
            _append('c', a5.read())
483
 
            _append('a', a6.read())
484
 
            _append('d', a7.read())
485
 
        else:
486
 
            t.append('c', a5)
487
 
            t.append_multi([('a', a6), ('d', a7)])
488
 
        del a5, a6, a7
489
 
        self.check_file_contents('c', open('f2', 'rb').read())
490
 
        self.check_file_contents('d', open('f3', 'rb').read())
491
 
 
492
 
 
493
 
    def test_delete(self):
494
 
        # TODO: Test Transport.delete
495
 
        t = self.get_transport()
496
 
 
497
 
        # Not much to do with a readonly transport
498
 
        if self.readonly:
499
 
            return
500
 
 
501
 
        open('a', 'wb').write('a little bit of text\n')
502
 
        self.failUnless(t.has('a'))
503
 
        self.failUnlessExists('a')
504
 
        t.delete('a')
505
 
        self.failIf(os.path.lexists('a'))
506
 
 
507
 
        self.assertRaises(NoSuchFile, t.delete, 'a')
508
 
 
509
 
        open('a', 'wb').write('a text\n')
510
 
        open('b', 'wb').write('b text\n')
511
 
        open('c', 'wb').write('c text\n')
512
 
        self.assertEqual([True, True, True],
513
 
                list(t.has_multi(['a', 'b', 'c'])))
514
 
        t.delete_multi(['a', 'c'])
515
 
        self.assertEqual([False, True, False],
516
 
                list(t.has_multi(['a', 'b', 'c'])))
517
 
        self.failIf(os.path.lexists('a'))
518
 
        self.failUnlessExists('b')
519
 
        self.failIf(os.path.lexists('c'))
520
 
 
521
 
        self.assertRaises(NoSuchFile,
522
 
                t.delete_multi, ['a', 'b', 'c'])
523
 
 
524
 
        self.assertRaises(NoSuchFile,
525
 
                t.delete_multi, iter(['a', 'b', 'c']))
526
 
 
527
 
        open('a', 'wb').write('another a text\n')
528
 
        open('c', 'wb').write('another c text\n')
529
 
        t.delete_multi(iter(['a', 'b', 'c']))
530
 
 
531
 
        # We should have deleted everything
532
 
        # SftpServer creates control files in the
533
 
        # working directory, so we can just do a
534
 
        # plain "listdir".
535
 
        # self.assertEqual([], os.listdir('.'))
536
 
 
537
 
    def test_move(self):
538
 
        t = self.get_transport()
539
 
 
540
 
        if self.readonly:
541
 
            return
542
 
 
543
 
        # TODO: I would like to use os.listdir() to
544
 
        # make sure there are no extra files, but SftpServer
545
 
        # creates control files in the working directory
546
 
        # perhaps all of this could be done in a subdirectory
547
 
 
548
 
        open('a', 'wb').write('a first file\n')
549
 
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
550
 
 
551
 
        t.move('a', 'b')
552
 
        self.failUnlessExists('b')
553
 
        self.failIf(os.path.lexists('a'))
554
 
 
555
 
        self.check_file_contents('b', 'a first file\n')
556
 
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
557
 
 
558
 
        # Overwrite a file
559
 
        open('c', 'wb').write('c this file\n')
560
 
        t.move('c', 'b')
561
 
        self.failIf(os.path.lexists('c'))
562
 
        self.check_file_contents('b', 'c this file\n')
563
 
 
564
 
        # TODO: Try to write a test for atomicity
565
 
        # TODO: Test moving into a non-existant subdirectory
566
 
        # TODO: Test Transport.move_multi
567
 
 
568
 
    def test_copy(self):
569
 
        t = self.get_transport()
570
 
 
571
 
        if self.readonly:
572
 
            return
573
 
 
574
 
        open('a', 'wb').write('a file\n')
575
 
        t.copy('a', 'b')
576
 
        self.check_file_contents('b', 'a file\n')
577
 
 
578
 
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
579
 
        os.mkdir('c')
580
 
        # What should the assert be if you try to copy a
581
 
        # file over a directory?
582
 
        #self.assertRaises(Something, t.copy, 'a', 'c')
583
 
        open('d', 'wb').write('text in d\n')
584
 
        t.copy('d', 'b')
585
 
        self.check_file_contents('b', 'text in d\n')
586
 
 
587
 
        # TODO: test copy_multi
588
 
 
589
 
    def test_connection_error(self):
590
 
        """ConnectionError is raised when connection is impossible"""
591
 
        if not hasattr(self, "get_bogus_transport"):
592
 
            return
593
 
        t = self.get_bogus_transport()
594
 
        try:
595
 
            t.get('.bzr/branch')
596
 
        except (ConnectionError, NoSuchFile), e:
597
 
            pass
598
 
        except (Exception), e:
599
 
            self.failIf(True, 'Wrong exception thrown: %s' % e)
600
 
        else:
601
 
            self.failIf(True, 'Did not get the expected exception.')
602
 
 
603
 
    def test_stat(self):
604
 
        # TODO: Test stat, just try once, and if it throws, stop testing
605
 
        from stat import S_ISDIR, S_ISREG
606
 
 
607
 
        t = self.get_transport()
608
 
 
609
 
        try:
610
 
            st = t.stat('.')
611
 
        except TransportNotPossible, e:
612
 
            # This transport cannot stat
613
 
            return
614
 
 
615
 
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
616
 
        self.build_tree(paths)
617
 
 
618
 
        local_stats = []
619
 
 
620
 
        for p in paths:
621
 
            st = t.stat(p)
622
 
            local_st = os.stat(p)
623
 
            if p.endswith('/'):
624
 
                self.failUnless(S_ISDIR(st.st_mode))
625
 
            else:
626
 
                self.failUnless(S_ISREG(st.st_mode))
627
 
            self.assertEqual(local_st.st_size, st.st_size)
628
 
            self.assertEqual(local_st.st_mode, st.st_mode)
629
 
            local_stats.append(local_st)
630
 
 
631
 
        remote_stats = list(t.stat_multi(paths))
632
 
        remote_iter_stats = list(t.stat_multi(iter(paths)))
633
 
 
634
 
        for local, remote, remote_iter in \
635
 
            zip(local_stats, remote_stats, remote_iter_stats):
636
 
            self.assertEqual(local.st_mode, remote.st_mode)
637
 
            self.assertEqual(local.st_mode, remote_iter.st_mode)
638
 
 
639
 
            self.assertEqual(local.st_size, remote.st_size)
640
 
            self.assertEqual(local.st_size, remote_iter.st_size)
641
 
            # Should we test UID/GID?
642
 
 
643
 
        self.assertRaises(NoSuchFile, t.stat, 'q')
644
 
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
645
 
 
646
 
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
647
 
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
648
 
 
649
 
    def test_list_dir(self):
650
 
        # TODO: Test list_dir, just try once, and if it throws, stop testing
651
 
        t = self.get_transport()
652
 
        
653
 
        if not t.listable():
654
 
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
655
 
            return
656
 
 
657
 
        def sorted_list(d):
658
 
            l = list(t.list_dir(d))
659
 
            l.sort()
660
 
            return l
661
 
 
662
 
        # SftpServer creates control files in the working directory
663
 
        # so lets move down a directory to be safe
664
 
        os.mkdir('wd')
665
 
        os.chdir('wd')
666
 
        t = t.clone('wd')
667
 
 
668
 
        self.assertEqual([], sorted_list(u'.'))
669
 
        self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e'])
670
 
 
671
 
        self.assertEqual([u'a', u'b', u'c'], sorted_list(u'.'))
672
 
        self.assertEqual([u'd', u'e'], sorted_list(u'c'))
673
 
 
674
 
        os.remove('c/d')
675
 
        os.remove('b')
676
 
        self.assertEqual([u'a', u'c'], sorted_list('.'))
677
 
        self.assertEqual([u'e'], sorted_list(u'c'))
678
 
 
679
 
        self.assertListRaises(NoSuchFile, t.list_dir, 'q')
680
 
        self.assertListRaises(NoSuchFile, t.list_dir, 'c/f')
681
 
        self.assertListRaises(NoSuchFile, t.list_dir, 'a')
682
 
 
683
 
    def test_clone(self):
684
 
        # TODO: Test that clone moves up and down the filesystem
685
 
        t1 = self.get_transport()
686
 
 
687
 
        self.build_tree(['a', 'b/', 'b/c'])
688
 
 
689
 
        self.failUnless(t1.has('a'))
690
 
        self.failUnless(t1.has('b/c'))
691
 
        self.failIf(t1.has('c'))
692
 
 
693
 
        t2 = t1.clone('b')
694
 
        self.failUnless(t2.has('c'))
695
 
        self.failIf(t2.has('a'))
696
 
 
697
 
        t3 = t2.clone('..')
698
 
        self.failUnless(t3.has('a'))
699
 
        self.failIf(t3.has('c'))
700
 
 
701
 
        self.failIf(t1.has('b/d'))
702
 
        self.failIf(t2.has('d'))
703
 
        self.failIf(t3.has('b/d'))
704
 
 
705
 
        if self.readonly:
706
 
            open('b/d', 'wb').write('newfile\n')
707
 
        else:
708
 
            t2.put('d', 'newfile\n')
709
 
 
710
 
        self.failUnless(t1.has('b/d'))
711
 
        self.failUnless(t2.has('d'))
712
 
        self.failUnless(t3.has('b/d'))
713
 
 
714
 
        
715
 
class LocalTransportTest(TestCaseInTempDir, TestTransportMixIn):
716
 
    def get_transport(self):
717
 
        from bzrlib.transport.local import LocalTransport
718
 
        return LocalTransport(u'.')
719
 
 
720
 
 
721
 
class HttpTransportTest(TestCaseWithWebserver, TestTransportMixIn):
722
 
 
723
 
    readonly = True
724
 
 
725
 
    def get_transport(self):
726
 
        from bzrlib.transport.http import HttpTransport
727
 
        url = self.get_remote_url(u'.')
728
 
        return HttpTransport(url)
729
 
 
730
 
    def get_bogus_transport(self):
731
 
        from bzrlib.transport.http import HttpTransport
732
 
        return HttpTransport('http://jasldkjsalkdjalksjdkljasd')
733
 
 
734
 
 
735
 
class TestMemoryTransport(TestCase):
736
 
 
737
 
    def test_get_transport(self):
738
 
        memory.MemoryTransport()
739
 
 
740
 
    def test_clone(self):
741
 
        transport = memory.MemoryTransport()
742
 
        self.failUnless(transport.clone() is transport)
743
 
 
744
 
    def test_abspath(self):
745
 
        transport = memory.MemoryTransport()
746
 
        self.assertEqual("in-memory:relpath", transport.abspath('relpath'))
747
 
 
748
 
    def test_relpath(self):
749
 
        transport = memory.MemoryTransport()
750
 
 
751
 
    def test_append_and_get(self):
752
 
        transport = memory.MemoryTransport()
753
 
        transport.append('path', StringIO('content'))
754
 
        self.assertEqual(transport.get('path').read(), 'content')
755
 
        transport.append('path', StringIO('content'))
756
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
757
 
 
758
 
    def test_put_and_get(self):
759
 
        transport = memory.MemoryTransport()
760
 
        transport.put('path', StringIO('content'))
761
 
        self.assertEqual(transport.get('path').read(), 'content')
762
 
        transport.put('path', StringIO('content'))
763
 
        self.assertEqual(transport.get('path').read(), 'content')
764
 
 
765
 
    def test_append_without_dir_fails(self):
766
 
        transport = memory.MemoryTransport()
767
 
        self.assertRaises(NoSuchFile,
768
 
                          transport.append, 'dir/path', StringIO('content'))
769
 
 
770
 
    def test_put_without_dir_fails(self):
771
 
        transport = memory.MemoryTransport()
772
 
        self.assertRaises(NoSuchFile,
773
 
                          transport.put, 'dir/path', StringIO('content'))
774
 
 
775
 
    def test_get_missing(self):
776
 
        transport = memory.MemoryTransport()
777
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
778
 
 
779
 
    def test_has_missing(self):
780
 
        transport = memory.MemoryTransport()
781
 
        self.assertEquals(False, transport.has('foo'))
782
 
 
783
 
    def test_has_present(self):
784
 
        transport = memory.MemoryTransport()
785
 
        transport.append('foo', StringIO('content'))
786
 
        self.assertEquals(True, transport.has('foo'))
787
 
 
788
 
    def test_mkdir(self):
789
 
        transport = memory.MemoryTransport()
790
 
        transport.mkdir('dir')
791
 
        transport.append('dir/path', StringIO('content'))
792
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
793
 
 
794
 
    def test_mkdir_missing_parent(self):
795
 
        transport = memory.MemoryTransport()
796
 
        self.assertRaises(NoSuchFile,
797
 
                          transport.mkdir, 'dir/dir')
798
 
 
799
 
    def test_mkdir_twice(self):
800
 
        transport = memory.MemoryTransport()
801
 
        transport.mkdir('dir')
802
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
40
    def test__get_set_protocol_handlers(self):
 
41
        handlers = _get_protocol_handlers()
 
42
        self.assertNotEqual({}, handlers)
 
43
        try:
 
44
            _set_protocol_handlers({})
 
45
            self.assertEqual({}, _get_protocol_handlers())
 
46
        finally:
 
47
            _set_protocol_handlers(handlers)
 
48
 
 
49
    def test_get_transport_modules(self):
 
50
        handlers = _get_protocol_handlers()
 
51
        class SampleHandler(object):
 
52
            """I exist, isnt that enough?"""
 
53
        try:
 
54
            my_handlers = {}
 
55
            _set_protocol_handlers(my_handlers)
 
56
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
57
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
58
            self.assertEqual([SampleHandler.__module__],
 
59
                             _get_transport_modules())
 
60
        finally:
 
61
            _set_protocol_handlers(handlers)
 
62
            
 
63
 
 
64
class MemoryTransportTest(TestCase):
 
65
    """Memory transport specific tests."""
803
66
 
804
67
    def test_parameters(self):
 
68
        import bzrlib.transport.memory as memory
805
69
        transport = memory.MemoryTransport()
806
70
        self.assertEqual(True, transport.listable())
807
71
        self.assertEqual(False, transport.should_cache())
808
 
 
809
 
    def test_iter_files_recursive(self):
810
 
        transport = memory.MemoryTransport()
811
 
        transport.mkdir('dir')
812
 
        transport.put('dir/foo', StringIO('content'))
813
 
        transport.put('dir/bar', StringIO('content'))
814
 
        transport.put('bar', StringIO('content'))
815
 
        paths = set(transport.iter_files_recursive())
816
 
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
817
 
 
818
 
    def test_stat(self):
819
 
        transport = memory.MemoryTransport()
820
 
        transport.put('foo', StringIO('content'))
821
 
        transport.put('bar', StringIO('phowar'))
822
 
        self.assertEqual(7, transport.stat('foo').st_size)
823
 
        self.assertEqual(6, transport.stat('bar').st_size)
824
 
 
 
72
        self.assertEqual(False, transport.is_readonly())