1
# Copyright (C) 2004, 2005 by Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
from cStringIO import StringIO
21
from bzrlib.errors import (NoSuchFile, FileExists, TransportNotPossible,
23
from bzrlib.tests import TestCase, TestCaseInTempDir
24
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
25
from bzrlib.transport import memory, urlescape
29
"""Append the given text (file-like object) to the supplied filename."""
36
class TestTransport(TestCase):
37
"""Test the non transport-concrete class functionality."""
39
def test_urlescape(self):
40
self.assertEqual('%25', urlescape('%'))
43
class TestTransportMixIn(object):
44
"""Subclass this, and it will provide a series of tests for a Transport.
45
It assumes that the Transport object is connected to the
46
current working directory. So that whatever is done
47
through the transport, should show up in the working
48
directory, and vice-versa.
50
This also tests to make sure that the functions work with both
51
generators and lists (assuming iter(list) is effectively a generator)
54
def get_transport(self):
55
"""Children should override this to return the Transport object.
57
raise NotImplementedError
60
t = self.get_transport()
62
files = ['a', 'b', 'e', 'g', '%']
63
self.build_tree(files)
64
self.assertEqual(t.has('a'), True)
65
self.assertEqual(t.has('c'), False)
66
self.assertEqual(t.has(urlescape('%')), True)
67
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
68
[True, True, False, False, True, False, True, False])
69
self.assertEqual(t.has_any(['a', 'b', 'c']), True)
70
self.assertEqual(t.has_any(['c', 'd', 'f', urlescape('%%')]), False)
71
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
72
[True, True, False, False, True, False, True, False])
73
self.assertEqual(t.has_any(['c', 'c', 'c']), False)
74
self.assertEqual(t.has_any(['b', 'b', 'b']), True)
77
t = self.get_transport()
79
files = ['a', 'b', 'e', 'g']
80
self.build_tree(files)
81
self.assertEqual(t.get('a').read(), open('a').read())
82
content_f = t.get_multi(files)
83
for path,f in zip(files, content_f):
84
self.assertEqual(open(path).read(), f.read())
86
content_f = t.get_multi(iter(files))
87
for path,f in zip(files, content_f):
88
self.assertEqual(open(path).read(), f.read())
90
self.assertRaises(NoSuchFile, t.get, 'c')
92
files = list(t.get_multi(['a', 'b', 'c']))
96
self.fail('Failed to raise NoSuchFile for missing file in get_multi')
98
files = list(t.get_multi(iter(['a', 'b', 'c', 'e'])))
102
self.fail('Failed to raise NoSuchFile for missing file in get_multi')
105
t = self.get_transport()
108
self.assertRaises(TransportNotPossible,
109
t.put, 'a', 'some text for a\n')
110
open('a', 'wb').write('some text for a\n')
112
t.put('a', 'some text for a\n')
113
self.assert_(os.path.exists('a'))
114
self.check_file_contents('a', 'some text for a\n')
115
self.assertEqual(t.get('a').read(), 'some text for a\n')
116
# Make sure 'has' is updated
117
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
118
[True, False, False, False, False])
120
self.assertRaises(TransportNotPossible,
122
[('a', 'new\ncontents for\na\n'),
123
('d', 'contents\nfor d\n')])
124
open('a', 'wb').write('new\ncontents for\na\n')
125
open('d', 'wb').write('contents\nfor d\n')
127
# Put also replaces contents
128
self.assertEqual(t.put_multi([('a', 'new\ncontents for\na\n'),
129
('d', 'contents\nfor d\n')]),
131
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
132
[True, False, False, True, False])
133
self.check_file_contents('a', 'new\ncontents for\na\n')
134
self.check_file_contents('d', 'contents\nfor d\n')
137
self.assertRaises(TransportNotPossible,
138
t.put_multi, iter([('a', 'diff\ncontents for\na\n'),
139
('d', 'another contents\nfor d\n')]))
140
open('a', 'wb').write('diff\ncontents for\na\n')
141
open('d', 'wb').write('another contents\nfor d\n')
144
t.put_multi(iter([('a', 'diff\ncontents for\na\n'),
145
('d', 'another contents\nfor d\n')]))
147
self.check_file_contents('a', 'diff\ncontents for\na\n')
148
self.check_file_contents('d', 'another contents\nfor d\n')
151
self.assertRaises(TransportNotPossible,
152
t.put, 'path/doesnt/exist/c', 'contents')
154
self.assertRaises(NoSuchFile,
155
t.put, 'path/doesnt/exist/c', 'contents')
157
def test_put_file(self):
158
t = self.get_transport()
160
# Test that StringIO can be used as a file-like object with put
161
f1 = StringIO('this is a string\nand some more stuff\n')
163
open('f1', 'wb').write(f1.read())
169
self.check_file_contents('f1',
170
'this is a string\nand some more stuff\n')
172
f2 = StringIO('here is some text\nand a bit more\n')
173
f3 = StringIO('some text for the\nthird file created\n')
176
open('f2', 'wb').write(f2.read())
177
open('f3', 'wb').write(f3.read())
179
t.put_multi([('f2', f2), ('f3', f3)])
183
self.check_file_contents('f2', 'here is some text\nand a bit more\n')
184
self.check_file_contents('f3', 'some text for the\nthird file created\n')
186
# Test that an actual file object can be used with put
187
f4 = open('f1', 'rb')
189
open('f4', 'wb').write(f4.read())
195
self.check_file_contents('f4',
196
'this is a string\nand some more stuff\n')
198
f5 = open('f2', 'rb')
199
f6 = open('f3', 'rb')
201
open('f5', 'wb').write(f5.read())
202
open('f6', 'wb').write(f6.read())
204
t.put_multi([('f5', f5), ('f6', f6)])
208
self.check_file_contents('f5', 'here is some text\nand a bit more\n')
209
self.check_file_contents('f6', 'some text for the\nthird file created\n')
213
def test_mkdir(self):
214
t = self.get_transport()
218
self.assertEqual(t.has('dir_a'), True)
219
self.assertEqual(t.has('dir_b'), False)
222
self.assertRaises(TransportNotPossible,
227
self.assertEqual(t.has('dir_b'), True)
228
self.assert_(os.path.isdir('dir_b'))
231
self.assertRaises(TransportNotPossible,
232
t.mkdir_multi, ['dir_c', 'dir_d'])
236
t.mkdir_multi(['dir_c', 'dir_d'])
239
self.assertRaises(TransportNotPossible,
240
t.mkdir_multi, iter(['dir_e', 'dir_f']))
244
t.mkdir_multi(iter(['dir_e', 'dir_f']))
245
self.assertEqual(list(t.has_multi(
246
['dir_a', 'dir_b', 'dir_c', 'dir_q',
247
'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
248
[True, True, True, False,
249
True, True, True, True])
250
for d in ['dir_a', 'dir_b', 'dir_c', 'dir_d', 'dir_e', 'dir_f']:
251
self.assert_(os.path.isdir(d))
253
if not self.readonly:
254
self.assertRaises(NoSuchFile, t.mkdir, 'path/doesnt/exist')
255
self.assertRaises(FileExists, t.mkdir, 'dir_a') # Creating a directory again should fail
257
# Make sure the transport recognizes when a
258
# directory is created by other means
259
# Caching Transports will fail, because dir_e was already seen not
260
# to exist. So instead, we will search for a new directory
262
#if not self.readonly:
263
# self.assertRaises(FileExists, t.mkdir, 'dir_e')
266
if not self.readonly:
267
self.assertRaises(FileExists, t.mkdir, 'dir_g')
269
# Test get/put in sub-directories
271
open('dir_a/a', 'wb').write('contents of dir_a/a')
272
open('dir_b/b', 'wb').write('contents of dir_b/b')
275
t.put_multi([('dir_a/a', 'contents of dir_a/a'),
276
('dir_b/b', 'contents of dir_b/b')])
278
for f in ('dir_a/a', 'dir_b/b'):
279
self.assertEqual(t.get(f).read(), open(f).read())
281
def test_copy_to(self):
283
from bzrlib.transport.local import LocalTransport
285
t = self.get_transport()
287
files = ['a', 'b', 'c', 'd']
288
self.build_tree(files)
290
dtmp = tempfile.mkdtemp(dir=u'.', prefix='test-transport-')
291
dtmp_base = os.path.basename(dtmp)
292
local_t = LocalTransport(dtmp)
294
t.copy_to(files, local_t)
296
self.assertEquals(open(f).read(),
297
open(os.path.join(dtmp_base, f)).read())
299
# Test that copying into a missing directory raises
302
open('e/f', 'wb').write('contents of e')
303
self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], local_t)
305
os.mkdir(os.path.join(dtmp_base, 'e'))
306
t.copy_to(['e/f'], local_t)
308
del dtmp, dtmp_base, local_t
310
dtmp = tempfile.mkdtemp(dir=u'.', prefix='test-transport-')
311
dtmp_base = os.path.basename(dtmp)
312
local_t = LocalTransport(dtmp)
314
files = ['a', 'b', 'c', 'd']
315
t.copy_to(iter(files), local_t)
317
self.assertEquals(open(f).read(),
318
open(os.path.join(dtmp_base, f)).read())
320
del dtmp, dtmp_base, local_t
322
def test_append(self):
323
t = self.get_transport()
326
open('a', 'wb').write('diff\ncontents for\na\n')
327
open('b', 'wb').write('contents\nfor b\n')
330
('a', 'diff\ncontents for\na\n'),
331
('b', 'contents\nfor b\n')
335
self.assertRaises(TransportNotPossible,
336
t.append, 'a', 'add\nsome\nmore\ncontents\n')
337
_append('a', 'add\nsome\nmore\ncontents\n')
339
t.append('a', 'add\nsome\nmore\ncontents\n')
341
self.check_file_contents('a',
342
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n')
345
self.assertRaises(TransportNotPossible,
347
[('a', 'and\nthen\nsome\nmore\n'),
348
('b', 'some\nmore\nfor\nb\n')])
349
_append('a', 'and\nthen\nsome\nmore\n')
350
_append('b', 'some\nmore\nfor\nb\n')
352
t.append_multi([('a', 'and\nthen\nsome\nmore\n'),
353
('b', 'some\nmore\nfor\nb\n')])
354
self.check_file_contents('a',
355
'diff\ncontents for\na\n'
356
'add\nsome\nmore\ncontents\n'
357
'and\nthen\nsome\nmore\n')
358
self.check_file_contents('b',
360
'some\nmore\nfor\nb\n')
363
_append('a', 'a little bit more\n')
364
_append('b', 'from an iterator\n')
366
t.append_multi(iter([('a', 'a little bit more\n'),
367
('b', 'from an iterator\n')]))
368
self.check_file_contents('a',
369
'diff\ncontents for\na\n'
370
'add\nsome\nmore\ncontents\n'
371
'and\nthen\nsome\nmore\n'
372
'a little bit more\n')
373
self.check_file_contents('b',
375
'some\nmore\nfor\nb\n'
376
'from an iterator\n')
378
def test_append_file(self):
379
t = self.get_transport()
382
('f1', 'this is a string\nand some more stuff\n'),
383
('f2', 'here is some text\nand a bit more\n'),
384
('f3', 'some text for the\nthird file created\n'),
385
('f4', 'this is a string\nand some more stuff\n'),
386
('f5', 'here is some text\nand a bit more\n'),
387
('f6', 'some text for the\nthird file created\n')
391
for f, val in contents:
392
open(f, 'wb').write(val)
394
t.put_multi(contents)
396
a1 = StringIO('appending to\none\n')
398
_append('f1', a1.read())
404
self.check_file_contents('f1',
405
'this is a string\nand some more stuff\n'
406
'appending to\none\n')
408
a2 = StringIO('adding more\ntext to two\n')
409
a3 = StringIO('some garbage\nto put in three\n')
412
_append('f2', a2.read())
413
_append('f3', a3.read())
415
t.append_multi([('f2', a2), ('f3', a3)])
419
self.check_file_contents('f2',
420
'here is some text\nand a bit more\n'
421
'adding more\ntext to two\n')
422
self.check_file_contents('f3',
423
'some text for the\nthird file created\n'
424
'some garbage\nto put in three\n')
426
# Test that an actual file object can be used with put
427
a4 = open('f1', 'rb')
429
_append('f4', a4.read())
435
self.check_file_contents('f4',
436
'this is a string\nand some more stuff\n'
437
'this is a string\nand some more stuff\n'
438
'appending to\none\n')
440
a5 = open('f2', 'rb')
441
a6 = open('f3', 'rb')
443
_append('f5', a5.read())
444
_append('f6', a6.read())
446
t.append_multi([('f5', a5), ('f6', a6)])
450
self.check_file_contents('f5',
451
'here is some text\nand a bit more\n'
452
'here is some text\nand a bit more\n'
453
'adding more\ntext to two\n')
454
self.check_file_contents('f6',
455
'some text for the\nthird file created\n'
456
'some text for the\nthird file created\n'
457
'some garbage\nto put in three\n')
459
def test_delete(self):
460
# TODO: Test Transport.delete
461
t = self.get_transport()
464
# TODO: Test Transport.move
465
t = self.get_transport()
468
# TODO: Test Transport.move
469
t = self.get_transport()
471
def test_connection_error(self):
472
"""ConnectionError is raised when connection is impossible"""
473
if not hasattr(self, "get_bogus_transport"):
475
t = self.get_bogus_transport()
478
except (ConnectionError, NoSuchFile), e:
480
except (Exception), e:
481
self.failIf(True, 'Wrong exception thrown: %s' % e)
483
self.failIf(True, 'Did not get the expected exception.')
486
class LocalTransportTest(TestCaseInTempDir, TestTransportMixIn):
487
def get_transport(self):
488
from bzrlib.transport.local import LocalTransport
489
return LocalTransport(u'.')
492
class HttpTransportTest(TestCaseWithWebserver, TestTransportMixIn):
496
def get_transport(self):
497
from bzrlib.transport.http import HttpTransport
498
url = self.get_remote_url(u'.')
499
return HttpTransport(url)
501
def get_bogus_transport(self):
502
from bzrlib.transport.http import HttpTransport
503
return HttpTransport('http://jasldkjsalkdjalksjdkljasd')
506
class TestMemoryTransport(TestCase):
508
def test_get_transport(self):
509
memory.MemoryTransport()
511
def test_clone(self):
512
transport = memory.MemoryTransport()
513
self.failUnless(transport.clone() is transport)
515
def test_abspath(self):
516
transport = memory.MemoryTransport()
517
self.assertEqual("in-memory:relpath", transport.abspath('relpath'))
519
def test_relpath(self):
520
transport = memory.MemoryTransport()
522
def test_append_and_get(self):
523
transport = memory.MemoryTransport()
524
transport.append('path', StringIO('content'))
525
self.assertEqual(transport.get('path').read(), 'content')
526
transport.append('path', StringIO('content'))
527
self.assertEqual(transport.get('path').read(), 'contentcontent')
529
def test_put_and_get(self):
530
transport = memory.MemoryTransport()
531
transport.put('path', StringIO('content'))
532
self.assertEqual(transport.get('path').read(), 'content')
533
transport.put('path', StringIO('content'))
534
self.assertEqual(transport.get('path').read(), 'content')
536
def test_append_without_dir_fails(self):
537
transport = memory.MemoryTransport()
538
self.assertRaises(NoSuchFile,
539
transport.append, 'dir/path', StringIO('content'))
541
def test_put_without_dir_fails(self):
542
transport = memory.MemoryTransport()
543
self.assertRaises(NoSuchFile,
544
transport.put, 'dir/path', StringIO('content'))
546
def test_get_missing(self):
547
transport = memory.MemoryTransport()
548
self.assertRaises(NoSuchFile, transport.get, 'foo')
550
def test_has_missing(self):
551
transport = memory.MemoryTransport()
552
self.assertEquals(False, transport.has('foo'))
554
def test_has_present(self):
555
transport = memory.MemoryTransport()
556
transport.append('foo', StringIO('content'))
557
self.assertEquals(True, transport.has('foo'))
559
def test_mkdir(self):
560
transport = memory.MemoryTransport()
561
transport.mkdir('dir')
562
transport.append('dir/path', StringIO('content'))
563
self.assertEqual(transport.get('dir/path').read(), 'content')
565
def test_mkdir_missing_parent(self):
566
transport = memory.MemoryTransport()
567
self.assertRaises(NoSuchFile,
568
transport.mkdir, 'dir/dir')
570
def test_mkdir_twice(self):
571
transport = memory.MemoryTransport()
572
transport.mkdir('dir')
573
self.assertRaises(FileExists, transport.mkdir, 'dir')
575
def test_parameters(self):
576
transport = memory.MemoryTransport()
577
self.assertEqual(True, transport.listable())
578
self.assertEqual(False, transport.should_cache())
580
def test_iter_files_recursive(self):
581
transport = memory.MemoryTransport()
582
transport.mkdir('dir')
583
transport.put('dir/foo', StringIO('content'))
584
transport.put('dir/bar', StringIO('content'))
585
transport.put('bar', StringIO('content'))
586
paths = set(transport.iter_files_recursive())
587
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
590
transport = memory.MemoryTransport()
591
transport.put('foo', StringIO('content'))
592
transport.put('bar', StringIO('phowar'))
593
self.assertEqual(7, transport.stat('foo').st_size)
594
self.assertEqual(6, transport.stat('bar').st_size)