111
112
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
112
113
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
115
def test_get_bytes(self):
116
t = self.get_transport()
118
files = ['a', 'b', 'e', 'g']
119
contents = ['contents of a\n',
124
self.build_tree(files, transport=t, line_endings='binary')
125
self.check_transport_contents('contents of a\n', t, 'a')
127
for content, fname in zip(contents, files):
128
self.assertEqual(content, t.get_bytes(fname))
130
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
114
132
def test_put(self):
115
133
t = self.get_transport()
117
135
if t.is_readonly():
118
self.assertRaises(TransportNotPossible,
119
t.put, 'a', 'some text for a\n')
122
t.put('a', StringIO('some text for a\n'))
123
self.failUnless(t.has('a'))
124
self.check_transport_contents('some text for a\n', t, 'a')
125
# Make sure 'has' is updated
126
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
127
[True, False, False, False, False])
128
# Put also replaces contents
129
self.assertEqual(t.put_multi([('a', StringIO('new\ncontents for\na\n')),
130
('d', StringIO('contents\nfor d\n'))]),
132
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
133
[True, False, False, True, False])
138
self.applyDeprecated(zero_eleven, t.put, 'a', 'string\ncontents\n')
139
self.check_transport_contents('string\ncontents\n', t, 'a')
141
self.applyDeprecated(zero_eleven,
142
t.put, 'b', StringIO('file-like\ncontents\n'))
143
self.check_transport_contents('file-like\ncontents\n', t, 'b')
145
def test_put_bytes(self):
146
t = self.get_transport()
149
self.assertRaises(TransportNotPossible,
150
t.put_bytes, 'a', 'some text for a\n')
153
t.put_bytes('a', 'some text for a\n')
154
self.failUnless(t.has('a'))
155
self.check_transport_contents('some text for a\n', t, 'a')
157
# The contents should be overwritten
158
t.put_bytes('a', 'new text for a\n')
159
self.check_transport_contents('new text for a\n', t, 'a')
161
self.assertRaises(NoSuchFile,
162
t.put_bytes, 'path/doesnt/exist/c', 'contents')
164
def test_put_bytes_non_atomic(self):
165
t = self.get_transport()
168
self.assertRaises(TransportNotPossible,
169
t.put_bytes_non_atomic, 'a', 'some text for a\n')
172
self.failIf(t.has('a'))
173
t.put_bytes_non_atomic('a', 'some text for a\n')
174
self.failUnless(t.has('a'))
175
self.check_transport_contents('some text for a\n', t, 'a')
176
# Put also replaces contents
177
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
178
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
180
# Make sure we can create another file
181
t.put_bytes_non_atomic('d', 'contents for\nd\n')
182
# And overwrite 'a' with empty contents
183
t.put_bytes_non_atomic('a', '')
184
self.check_transport_contents('contents for\nd\n', t, 'd')
185
self.check_transport_contents('', t, 'a')
187
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
189
# Now test the create_parent flag
190
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
192
self.failIf(t.has('dir/a'))
193
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
194
create_parent_dir=True)
195
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
197
# But we still get NoSuchFile if we can't make the parent dir
198
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
200
create_parent_dir=True)
202
def test_put_bytes_permissions(self):
203
t = self.get_transport()
207
if not t._can_roundtrip_unix_modebits():
208
# Can't roundtrip, so no need to run this test
210
t.put_bytes('mode644', 'test text\n', mode=0644)
211
self.assertTransportMode(t, 'mode644', 0644)
212
t.put_bytes('mode666', 'test text\n', mode=0666)
213
self.assertTransportMode(t, 'mode666', 0666)
214
t.put_bytes('mode600', 'test text\n', mode=0600)
215
self.assertTransportMode(t, 'mode600', 0600)
216
# Yes, you can put_bytes a file such that it becomes readonly
217
t.put_bytes('mode400', 'test text\n', mode=0400)
218
self.assertTransportMode(t, 'mode400', 0400)
220
# The default permissions should be based on the current umask
221
umask = osutils.get_umask()
222
t.put_bytes('nomode', 'test text\n', mode=None)
223
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
225
def test_put_bytes_non_atomic_permissions(self):
226
t = self.get_transport()
230
if not t._can_roundtrip_unix_modebits():
231
# Can't roundtrip, so no need to run this test
233
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
234
self.assertTransportMode(t, 'mode644', 0644)
235
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
236
self.assertTransportMode(t, 'mode666', 0666)
237
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
238
self.assertTransportMode(t, 'mode600', 0600)
239
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
240
self.assertTransportMode(t, 'mode400', 0400)
242
# The default permissions should be based on the current umask
243
umask = osutils.get_umask()
244
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
245
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
247
def test_put_file(self):
248
t = self.get_transport()
251
self.assertRaises(TransportNotPossible,
252
t.put_file, 'a', StringIO('some text for a\n'))
255
t.put_file('a', StringIO('some text for a\n'))
256
self.failUnless(t.has('a'))
257
self.check_transport_contents('some text for a\n', t, 'a')
258
# Put also replaces contents
259
t.put_file('a', StringIO('new\ncontents for\na\n'))
260
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
261
self.assertRaises(NoSuchFile,
262
t.put_file, 'path/doesnt/exist/c',
263
StringIO('contents'))
265
def test_put_file_non_atomic(self):
266
t = self.get_transport()
269
self.assertRaises(TransportNotPossible,
270
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
273
self.failIf(t.has('a'))
274
t.put_file_non_atomic('a', StringIO('some text for a\n'))
275
self.failUnless(t.has('a'))
276
self.check_transport_contents('some text for a\n', t, 'a')
277
# Put also replaces contents
278
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
279
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
281
# Make sure we can create another file
282
t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
283
# And overwrite 'a' with empty contents
284
t.put_file_non_atomic('a', StringIO(''))
285
self.check_transport_contents('contents for\nd\n', t, 'd')
286
self.check_transport_contents('', t, 'a')
288
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
289
StringIO('contents\n'))
290
# Now test the create_parent flag
291
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
292
StringIO('contents\n'))
293
self.failIf(t.has('dir/a'))
294
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
295
create_parent_dir=True)
296
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
298
# But we still get NoSuchFile if we can't make the parent dir
299
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
300
StringIO('contents\n'),
301
create_parent_dir=True)
303
def test_put_file_permissions(self):
305
t = self.get_transport()
309
if not t._can_roundtrip_unix_modebits():
310
# Can't roundtrip, so no need to run this test
312
t.put_file('mode644', StringIO('test text\n'), mode=0644)
313
self.assertTransportMode(t, 'mode644', 0644)
314
t.put_file('mode666', StringIO('test text\n'), mode=0666)
315
self.assertTransportMode(t, 'mode666', 0666)
316
t.put_file('mode600', StringIO('test text\n'), mode=0600)
317
self.assertTransportMode(t, 'mode600', 0600)
318
# Yes, you can put a file such that it becomes readonly
319
t.put_file('mode400', StringIO('test text\n'), mode=0400)
320
self.assertTransportMode(t, 'mode400', 0400)
322
# XXX: put_multi is deprecated, so do we really care anymore?
323
self.applyDeprecated(zero_eleven, t.put_multi,
324
[('mmode644', StringIO('text\n'))], mode=0644)
325
self.assertTransportMode(t, 'mmode644', 0644)
327
# The default permissions should be based on the current umask
328
umask = osutils.get_umask()
329
t.put_file('nomode', StringIO('test text\n'), mode=None)
330
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
332
def test_put_file_non_atomic_permissions(self):
333
t = self.get_transport()
337
if not t._can_roundtrip_unix_modebits():
338
# Can't roundtrip, so no need to run this test
340
t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
341
self.assertTransportMode(t, 'mode644', 0644)
342
t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
343
self.assertTransportMode(t, 'mode666', 0666)
344
t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
345
self.assertTransportMode(t, 'mode600', 0600)
346
# Yes, you can put_file_non_atomic a file such that it becomes readonly
347
t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
348
self.assertTransportMode(t, 'mode400', 0400)
350
# The default permissions should be based on the current umask
351
umask = osutils.get_umask()
352
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
353
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
355
def test_put_multi(self):
356
t = self.get_transport()
360
self.assertEqual(2, self.applyDeprecated(zero_eleven,
361
t.put_multi, [('a', StringIO('new\ncontents for\na\n')),
362
('d', StringIO('contents\nfor d\n'))]
364
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd'])),
365
[True, False, False, True])
134
366
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
135
367
self.check_transport_contents('contents\nfor d\n', t, 'd')
138
t.put_multi(iter([('a', StringIO('diff\ncontents for\na\n')),
139
('d', StringIO('another contents\nfor d\n'))])),
369
self.assertEqual(2, self.applyDeprecated(zero_eleven,
370
t.put_multi, iter([('a', StringIO('diff\ncontents for\na\n')),
371
('d', StringIO('another contents\nfor d\n'))])
141
373
self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
142
374
self.check_transport_contents('another contents\nfor d\n', t, 'd')
144
self.assertRaises(NoSuchFile,
145
t.put, 'path/doesnt/exist/c', StringIO('contents'))
147
def test_put_permissions(self):
148
t = self.get_transport()
152
if not t._can_roundtrip_unix_modebits():
153
# Can't roundtrip, so no need to run this test
155
t.put('mode644', StringIO('test text\n'), mode=0644)
156
self.assertTransportMode(t, 'mode644', 0644)
157
t.put('mode666', StringIO('test text\n'), mode=0666)
158
self.assertTransportMode(t, 'mode666', 0666)
159
t.put('mode600', StringIO('test text\n'), mode=0600)
160
self.assertTransportMode(t, 'mode600', 0600)
161
# Yes, you can put a file such that it becomes readonly
162
t.put('mode400', StringIO('test text\n'), mode=0400)
163
self.assertTransportMode(t, 'mode400', 0400)
164
t.put_multi([('mmode644', StringIO('text\n'))], mode=0644)
165
self.assertTransportMode(t, 'mmode644', 0644)
167
# The default permissions should be based on the current umask
168
umask = osutils.get_umask()
169
t.put('nomode', StringIO('test text\n'), mode=None)
170
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
172
376
def test_mkdir(self):
173
377
t = self.get_transport()
295
498
t = self.get_transport()
297
500
if t.is_readonly():
298
open('a', 'wb').write('diff\ncontents for\na\n')
299
open('b', 'wb').write('contents\nfor b\n')
302
('a', StringIO('diff\ncontents for\na\n')),
303
('b', StringIO('contents\nfor b\n'))
307
self.assertRaises(TransportNotPossible,
308
t.append, 'a', 'add\nsome\nmore\ncontents\n')
309
_append('a', StringIO('add\nsome\nmore\ncontents\n'))
312
t.append('a', StringIO('add\nsome\nmore\ncontents\n')))
314
self.check_transport_contents(
315
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
319
self.assertRaises(TransportNotPossible,
321
[('a', 'and\nthen\nsome\nmore\n'),
322
('b', 'some\nmore\nfor\nb\n')])
323
_append('a', StringIO('and\nthen\nsome\nmore\n'))
324
_append('b', StringIO('some\nmore\nfor\nb\n'))
326
self.assertEqual((43, 15),
327
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
328
('b', StringIO('some\nmore\nfor\nb\n'))]))
502
t.put_bytes('a', 'diff\ncontents for\na\n')
503
t.put_bytes('b', 'contents\nfor b\n')
505
self.assertEqual(20, self.applyDeprecated(zero_eleven,
506
t.append, 'a', StringIO('add\nsome\nmore\ncontents\n')))
508
self.check_transport_contents(
509
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
512
# And we can create new files, too
513
self.assertEqual(0, self.applyDeprecated(zero_eleven,
514
t.append, 'c', StringIO('some text\nfor a missing file\n')))
515
self.check_transport_contents('some text\nfor a missing file\n',
517
def test_append_file(self):
518
t = self.get_transport()
521
self.assertRaises(TransportNotPossible,
522
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
524
t.put_bytes('a', 'diff\ncontents for\na\n')
525
t.put_bytes('b', 'contents\nfor b\n')
528
t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
530
self.check_transport_contents(
531
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
534
# a file with no parent should fail..
535
self.assertRaises(NoSuchFile,
536
t.append_file, 'missing/path', StringIO('content'))
538
# And we can create new files, too
540
t.append_file('c', StringIO('some text\nfor a missing file\n')))
541
self.check_transport_contents('some text\nfor a missing file\n',
544
def test_append_bytes(self):
545
t = self.get_transport()
548
self.assertRaises(TransportNotPossible,
549
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
552
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
553
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
556
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
558
self.check_transport_contents(
559
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
562
# a file with no parent should fail..
563
self.assertRaises(NoSuchFile,
564
t.append_bytes, 'missing/path', 'content')
566
def test_append_multi(self):
567
t = self.get_transport()
571
t.put_bytes('a', 'diff\ncontents for\na\n'
572
'add\nsome\nmore\ncontents\n')
573
t.put_bytes('b', 'contents\nfor b\n')
575
self.assertEqual((43, 15),
576
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
577
('b', StringIO('some\nmore\nfor\nb\n'))]))
329
579
self.check_transport_contents(
330
580
'diff\ncontents for\na\n'
331
581
'add\nsome\nmore\ncontents\n'
372
612
'a little bit more\n'
373
613
'some text in a\n',
375
self.check_transport_contents('some text\nfor a missing file\n',
377
615
self.check_transport_contents('missing file r\n', t, 'd')
379
# a file with no parent should fail..
380
if not t.is_readonly():
381
self.assertRaises(NoSuchFile,
382
t.append, 'missing/path',
385
def test_append_file(self):
386
t = self.get_transport()
389
('f1', StringIO('this is a string\nand some more stuff\n')),
390
('f2', StringIO('here is some text\nand a bit more\n')),
391
('f3', StringIO('some text for the\nthird file created\n')),
392
('f4', StringIO('this is a string\nand some more stuff\n')),
393
('f5', StringIO('here is some text\nand a bit more\n')),
394
('f6', StringIO('some text for the\nthird file created\n'))
398
for f, val in contents:
399
open(f, 'wb').write(val.read())
401
t.put_multi(contents)
403
a1 = StringIO('appending to\none\n')
411
self.check_transport_contents(
412
'this is a string\nand some more stuff\n'
413
'appending to\none\n',
416
a2 = StringIO('adding more\ntext to two\n')
417
a3 = StringIO('some garbage\nto put in three\n')
423
t.append_multi([('f2', a2), ('f3', a3)])
427
self.check_transport_contents(
428
'here is some text\nand a bit more\n'
429
'adding more\ntext to two\n',
431
self.check_transport_contents(
432
'some text for the\nthird file created\n'
433
'some garbage\nto put in three\n',
436
# Test that an actual file object can be used with put
445
self.check_transport_contents(
446
'this is a string\nand some more stuff\n'
447
'this is a string\nand some more stuff\n'
448
'appending to\none\n',
457
t.append_multi([('f5', a5), ('f6', a6)])
461
self.check_transport_contents(
462
'here is some text\nand a bit more\n'
463
'here is some text\nand a bit more\n'
464
'adding more\ntext to two\n',
466
self.check_transport_contents(
467
'some text for the\nthird file created\n'
468
'some text for the\nthird file created\n'
469
'some garbage\nto put in three\n',
481
t.append_multi([('a', a6), ('d', a7)])
483
self.check_transport_contents(t.get('f2').read(), t, 'c')
484
self.check_transport_contents(t.get('f3').read(), t, 'd')
486
def test_append_mode(self):
617
def test_append_file_mode(self):
618
"""Check that append accepts a mode parameter"""
487
619
# check append accepts a mode
488
620
t = self.get_transport()
489
621
if t.is_readonly():
491
t.append('f', StringIO('f'), mode=None)
622
self.assertRaises(TransportNotPossible,
623
t.append_file, 'f', StringIO('f'), mode=None)
625
t.append_file('f', StringIO('f'), mode=None)
627
def test_append_bytes_mode(self):
628
# check append_bytes accepts a mode
629
t = self.get_transport()
631
self.assertRaises(TransportNotPossible,
632
t.append_bytes, 'f', 'f', mode=None)
634
t.append_bytes('f', 'f', mode=None)
493
636
def test_delete(self):
494
637
# TODO: Test Transport.delete