231
230
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
234
t = self.get_transport()
239
self.applyDeprecated(zero_eleven, t.put, 'a', 'string\ncontents\n')
240
self.check_transport_contents('string\ncontents\n', t, 'a')
242
self.applyDeprecated(zero_eleven,
243
t.put, 'b', StringIO('file-like\ncontents\n'))
244
self.check_transport_contents('file-like\ncontents\n', t, 'b')
246
self.assertRaises(NoSuchFile,
247
self.applyDeprecated,
249
t.put, 'path/doesnt/exist/c', StringIO('contents'))
232
def test_get_with_open_write_stream_sees_all_content(self):
233
t = self.get_transport()
236
handle = t.open_write_stream('foo')
239
self.assertEqual('b', t.get('foo').read())
243
def test_get_bytes_with_open_write_stream_sees_all_content(self):
244
t = self.get_transport()
247
handle = t.open_write_stream('foo')
250
self.assertEqual('b', t.get_bytes('foo'))
251
self.assertEqual('b', t.get('foo').read())
251
255
def test_put_bytes(self):
252
256
t = self.get_transport()
436
440
# Yes, you can put a file such that it becomes readonly
437
441
t.put_file('mode400', StringIO('test text\n'), mode=0400)
438
442
self.assertTransportMode(t, 'mode400', 0400)
440
# XXX: put_multi is deprecated, so do we really care anymore?
441
self.applyDeprecated(zero_eleven, t.put_multi,
442
[('mmode644', StringIO('text\n'))], mode=0644)
443
self.assertTransportMode(t, 'mmode644', 0644)
445
443
# The default permissions should be based on the current umask
446
444
umask = osutils.get_umask()
447
445
t.put_file('nomode', StringIO('test text\n'), mode=None)
511
509
unicode_file = pyStringIO(u'\u1234')
512
510
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
514
def test_put_multi(self):
515
t = self.get_transport()
519
self.assertEqual(2, self.applyDeprecated(zero_eleven,
520
t.put_multi, [('a', StringIO('new\ncontents for\na\n')),
521
('d', StringIO('contents\nfor d\n'))]
523
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd'])),
524
[True, False, False, True])
525
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
526
self.check_transport_contents('contents\nfor d\n', t, 'd')
528
self.assertEqual(2, self.applyDeprecated(zero_eleven,
529
t.put_multi, iter([('a', StringIO('diff\ncontents for\na\n')),
530
('d', StringIO('another contents\nfor d\n'))])
532
self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
533
self.check_transport_contents('another contents\nfor d\n', t, 'd')
535
def test_put_permissions(self):
536
t = self.get_transport()
540
if not t._can_roundtrip_unix_modebits():
541
# Can't roundtrip, so no need to run this test
543
self.applyDeprecated(zero_eleven, t.put, 'mode644',
544
StringIO('test text\n'), mode=0644)
545
self.assertTransportMode(t, 'mode644', 0644)
546
self.applyDeprecated(zero_eleven, t.put, 'mode666',
547
StringIO('test text\n'), mode=0666)
548
self.assertTransportMode(t, 'mode666', 0666)
549
self.applyDeprecated(zero_eleven, t.put, 'mode600',
550
StringIO('test text\n'), mode=0600)
551
self.assertTransportMode(t, 'mode600', 0600)
552
# Yes, you can put a file such that it becomes readonly
553
self.applyDeprecated(zero_eleven, t.put, 'mode400',
554
StringIO('test text\n'), mode=0400)
555
self.assertTransportMode(t, 'mode400', 0400)
556
self.applyDeprecated(zero_eleven, t.put_multi,
557
[('mmode644', StringIO('text\n'))], mode=0644)
558
self.assertTransportMode(t, 'mmode644', 0644)
560
# The default permissions should be based on the current umask
561
umask = osutils.get_umask()
562
self.applyDeprecated(zero_eleven, t.put, 'nomode',
563
StringIO('test text\n'), mode=None)
564
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
566
512
def test_mkdir(self):
567
513
t = self.get_transport()
633
579
t.mkdir('dnomode', mode=None)
634
580
self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
582
def test_opening_a_file_stream_creates_file(self):
583
t = self.get_transport()
586
handle = t.open_write_stream('foo')
588
self.assertEqual('', t.get_bytes('foo'))
592
def test_opening_a_file_stream_can_set_mode(self):
593
t = self.get_transport()
596
if not t._can_roundtrip_unix_modebits():
597
# Can't roundtrip, so no need to run this test
599
def check_mode(name, mode, expected):
600
handle = t.open_write_stream(name, mode=mode)
602
self.assertTransportMode(t, name, expected)
603
check_mode('mode644', 0644, 0644)
604
check_mode('mode666', 0666, 0666)
605
check_mode('mode600', 0600, 0600)
606
# The default permissions should be based on the current umask
607
check_mode('nomode', None, 0666 & ~osutils.get_umask())
636
609
def test_copy_to(self):
637
610
# FIXME: test: same server to same server (partly done)
638
611
# same protocol two servers
684
657
self.assertTransportMode(temp_transport, f, mode)
686
def test_append(self):
687
t = self.get_transport()
691
t.put_bytes('a', 'diff\ncontents for\na\n')
692
t.put_bytes('b', 'contents\nfor b\n')
694
self.assertEqual(20, self.applyDeprecated(zero_eleven,
695
t.append, 'a', StringIO('add\nsome\nmore\ncontents\n')))
697
self.check_transport_contents(
698
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
701
# And we can create new files, too
702
self.assertEqual(0, self.applyDeprecated(zero_eleven,
703
t.append, 'c', StringIO('some text\nfor a missing file\n')))
704
self.check_transport_contents('some text\nfor a missing file\n',
706
659
def test_append_file(self):
707
660
t = self.get_transport()
1527
1485
self.assertEqual(d[2], (0, '0'))
1528
1486
self.assertEqual(d[3], (3, '34'))
1488
def test_get_with_open_write_stream_sees_all_content(self):
1489
t = self.get_transport()
1492
handle = t.open_write_stream('foo')
1495
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1530
1499
def test_get_smart_medium(self):
1531
1500
"""All transports must either give a smart medium, or know they can't.