~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_smart_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-02-04 12:56:11 UTC
  • mfrom: (3842.3.22 call_with_body_stream)
  • Revision ID: pqm@pqm.ubuntu.com-20090204125611-m7kqmwruvndk7yrv
Add client and server APIs for streamed request bodies.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1489
1489
        smart_protocol._has_dispatched = True
1490
1490
        smart_protocol.request = _mod_request.SmartServerRequestHandler(
1491
1491
            None, _mod_request.request_handlers, '/')
1492
 
        class FakeCommand(object):
1493
 
            def do_body(cmd, body_bytes):
 
1492
        class FakeCommand(_mod_request.SmartServerRequest):
 
1493
            def do_body(self_cmd, body_bytes):
1494
1494
                self.end_received = True
1495
1495
                self.assertEqual('abcdefg', body_bytes)
1496
1496
                return _mod_request.SuccessfulSmartServerResponse(('ok', ))
1497
 
        smart_protocol.request._command = FakeCommand()
 
1497
        smart_protocol.request._command = FakeCommand(None)
1498
1498
        # Call accept_bytes to make sure that internal state like _body_decoder
1499
1499
        # is initialised.  This test should probably be given a clearer
1500
1500
        # interface to work with that will not cause this inconsistency.
2358
2358
            '\0\0\0\x07' # length prefix
2359
2359
            'l3:ARGe' # ['ARG']
2360
2360
            )
2361
 
        self.assertEqual([('structure', ['ARG'])], event_log)
 
2361
        self.assertEqual([('structure', ('ARG',))], event_log)
2362
2362
 
2363
2363
    def test_decode_multiple_bytes(self):
2364
2364
        """The protocol can decode a multiple 'bytes' message parts."""
2375
2375
            [('bytes', 'first'), ('bytes', 'second')], event_log)
2376
2376
 
2377
2377
 
2378
 
class TestConventionalResponseHandler(tests.TestCase):
 
2378
class TestConventionalResponseHandlerBodyStream(tests.TestCase):
2379
2379
 
2380
2380
    def make_response_handler(self, response_bytes):
2381
2381
        from bzrlib.smart.message import ConventionalResponseHandler
2392
2392
            protocol_decoder, medium_request)
2393
2393
        return response_handler
2394
2394
 
2395
 
    def test_body_stream_interrupted_by_error(self):
 
2395
    def test_interrupted_by_error(self):
2396
2396
        interrupted_body_stream = (
2397
2397
            'oS' # successful response
2398
2398
            's\0\0\0\x02le' # empty args
2409
2409
        exc = self.assertRaises(errors.ErrorFromSmartServer, stream.next)
2410
2410
        self.assertEqual(('error', 'abc'), exc.error_tuple)
2411
2411
 
2412
 
    def test_body_stream_interrupted_by_connection_lost(self):
 
2412
    def test_interrupted_by_connection_lost(self):
2413
2413
        interrupted_body_stream = (
2414
2414
            'oS' # successful response
2415
2415
            's\0\0\0\x02le' # empty args
2427
2427
        self.assertRaises(
2428
2428
            errors.ConnectionReset, response_handler.read_body_bytes)
2429
2429
 
 
2430
    def test_multiple_bytes_parts(self):
 
2431
        multiple_bytes_parts = (
 
2432
            'oS' # successful response
 
2433
            's\0\0\0\x02le' # empty args
 
2434
            'b\0\0\0\x0bSome bytes\n' # some bytes
 
2435
            'b\0\0\0\x0aMore bytes' # more bytes
 
2436
            'e' # message end
 
2437
            )
 
2438
        response_handler = self.make_response_handler(multiple_bytes_parts)
 
2439
        self.assertEqual(
 
2440
            'Some bytes\nMore bytes', response_handler.read_body_bytes())
 
2441
        response_handler = self.make_response_handler(multiple_bytes_parts)
 
2442
        self.assertEqual(
 
2443
            ['Some bytes\n', 'More bytes'],
 
2444
            list(response_handler.read_streamed_body()))
 
2445
 
 
2446
 
 
2447
class FakeResponder(object):
 
2448
 
 
2449
    response_sent = False
 
2450
 
 
2451
    def send_error(self, exc):
 
2452
        raise exc
 
2453
 
 
2454
    def send_response(self, response):
 
2455
        pass
 
2456
 
 
2457
 
 
2458
class TestConventionalRequestHandlerBodyStream(tests.TestCase):
 
2459
    """Tests for ConventionalRequestHandler's handling of request bodies."""
 
2460
 
 
2461
    def make_request_handler(self, request_bytes):
 
2462
        """Make a ConventionalRequestHandler for the given bytes using test
 
2463
        doubles for the request_handler and the responder.
 
2464
        """
 
2465
        from bzrlib.smart.message import ConventionalRequestHandler
 
2466
        request_handler = InstrumentedRequestHandler()
 
2467
        request_handler.response = _mod_request.SuccessfulSmartServerResponse(('arg', 'arg'))
 
2468
        responder = FakeResponder()
 
2469
        message_handler = ConventionalRequestHandler(request_handler, responder)
 
2470
        protocol_decoder = protocol.ProtocolThreeDecoder(message_handler)
 
2471
        # put decoder in desired state (waiting for message parts)
 
2472
        protocol_decoder.state_accept = protocol_decoder._state_accept_expecting_message_part
 
2473
        protocol_decoder.accept_bytes(request_bytes)
 
2474
        return request_handler
 
2475
 
 
2476
    def test_multiple_bytes_parts(self):
 
2477
        """Each bytes part triggers a call to the request_handler's
 
2478
        accept_body method.
 
2479
        """
 
2480
        multiple_bytes_parts = (
 
2481
            's\0\0\0\x07l3:fooe' # args
 
2482
            'b\0\0\0\x0bSome bytes\n' # some bytes
 
2483
            'b\0\0\0\x0aMore bytes' # more bytes
 
2484
            'e' # message end
 
2485
            )
 
2486
        request_handler = self.make_request_handler(multiple_bytes_parts)
 
2487
        accept_body_calls = [
 
2488
            call_info[1] for call_info in request_handler.calls
 
2489
            if call_info[0] == 'accept_body']
 
2490
        self.assertEqual(
 
2491
            ['Some bytes\n', 'More bytes'], accept_body_calls)
 
2492
 
 
2493
    def test_error_flag_after_body(self):
 
2494
        body_then_error = (
 
2495
            's\0\0\0\x07l3:fooe' # request args
 
2496
            'b\0\0\0\x0bSome bytes\n' # some bytes
 
2497
            'b\0\0\0\x0aMore bytes' # more bytes
 
2498
            'oE' # error flag
 
2499
            's\0\0\0\x07l3:bare' # error args
 
2500
            'e' # message end
 
2501
            )
 
2502
        request_handler = self.make_request_handler(body_then_error)
 
2503
        self.assertEqual(
 
2504
            [('post_body_error_received', ('bar',)), ('end_received',)],
 
2505
            request_handler.calls[-2:])
 
2506
 
2430
2507
 
2431
2508
class TestMessageHandlerErrors(tests.TestCase):
2432
2509
    """Tests for v3 that unrecognised (but well-formed) requests/responses are
2476
2553
 
2477
2554
    def __init__(self):
2478
2555
        self.calls = []
2479
 
 
2480
 
    def body_chunk_received(self, chunk_bytes):
2481
 
        self.calls.append(('body_chunk_received', chunk_bytes))
 
2556
        self.finished_reading = False
2482
2557
 
2483
2558
    def no_body_received(self):
2484
2559
        self.calls.append(('no_body_received',))
2485
2560
 
2486
 
    def prefixed_body_received(self, body_bytes):
2487
 
        self.calls.append(('prefixed_body_received', body_bytes))
2488
 
 
2489
2561
    def end_received(self):
2490
2562
        self.calls.append(('end_received',))
 
2563
        self.finished_reading = True
 
2564
 
 
2565
    def dispatch_command(self, cmd, args):
 
2566
        self.calls.append(('dispatch_command', cmd, args))
 
2567
 
 
2568
    def accept_body(self, bytes):
 
2569
        self.calls.append(('accept_body', bytes))
 
2570
 
 
2571
    def end_of_body(self):
 
2572
        self.calls.append(('end_of_body',))
 
2573
        self.finished_reading = True
 
2574
 
 
2575
    def post_body_error_received(self, error_args):
 
2576
        self.calls.append(('post_body_error_received', error_args))
2491
2577
 
2492
2578
 
2493
2579
class StubRequest(object):
2529
2615
        # The message handler has been invoked with all the parts of the
2530
2616
        # trivial response: empty headers, status byte, no args, end.
2531
2617
        self.assertEqual(
2532
 
            [('headers', {}), ('byte', 'S'), ('structure', []), ('end',)],
 
2618
            [('headers', {}), ('byte', 'S'), ('structure', ()), ('end',)],
2533
2619
            response_handler.event_log)
2534
2620
 
2535
2621
    def test_incomplete_message(self):
2643
2729
        self.assertEqual(
2644
2730
            ['accept_bytes', 'finished_writing'], medium_request.calls)
2645
2731
 
 
2732
    def test_call_with_body_stream_smoke_test(self):
 
2733
        """A smoke test for ProtocolThreeRequester.call_with_body_stream.
 
2734
 
 
2735
        This test checks that a particular simple invocation of
 
2736
        call_with_body_stream emits the correct bytes for that invocation.
 
2737
        """
 
2738
        requester, output = self.make_client_encoder_and_output()
 
2739
        requester.set_headers({'header name': 'header value'})
 
2740
        stream = ['chunk 1', 'chunk two']
 
2741
        requester.call_with_body_stream(('one arg',), stream)
 
2742
        self.assertEquals(
 
2743
            'bzr message 3 (bzr 1.6)\n' # protocol version
 
2744
            '\x00\x00\x00\x1fd11:header name12:header valuee' # headers
 
2745
            's\x00\x00\x00\x0bl7:one arge' # args
 
2746
            'b\x00\x00\x00\x07chunk 1' # a prefixed body chunk
 
2747
            'b\x00\x00\x00\x09chunk two' # a prefixed body chunk
 
2748
            'e', # end
 
2749
            output.getvalue())
 
2750
 
 
2751
    def test_call_with_body_stream_empty_stream(self):
 
2752
        """call_with_body_stream with an empty stream."""
 
2753
        requester, output = self.make_client_encoder_and_output()
 
2754
        requester.set_headers({})
 
2755
        stream = []
 
2756
        requester.call_with_body_stream(('one arg',), stream)
 
2757
        self.assertEquals(
 
2758
            'bzr message 3 (bzr 1.6)\n' # protocol version
 
2759
            '\x00\x00\x00\x02de' # headers
 
2760
            's\x00\x00\x00\x0bl7:one arge' # args
 
2761
            # no body chunks
 
2762
            'e', # end
 
2763
            output.getvalue())
 
2764
 
 
2765
    def test_call_with_body_stream_error(self):
 
2766
        """call_with_body_stream will abort the streamed body with an
 
2767
        error if the stream raises an error during iteration.
 
2768
        
 
2769
        The resulting request will still be a complete message.
 
2770
        """
 
2771
        requester, output = self.make_client_encoder_and_output()
 
2772
        requester.set_headers({})
 
2773
        def stream_that_fails():
 
2774
            yield 'aaa'
 
2775
            yield 'bbb'
 
2776
            raise Exception('Boom!')
 
2777
        requester.call_with_body_stream(('one arg',), stream_that_fails())
 
2778
        self.assertEquals(
 
2779
            'bzr message 3 (bzr 1.6)\n' # protocol version
 
2780
            '\x00\x00\x00\x02de' # headers
 
2781
            's\x00\x00\x00\x0bl7:one arge' # args
 
2782
            'b\x00\x00\x00\x03aaa' # body
 
2783
            'b\x00\x00\x00\x03bbb' # more body
 
2784
            'oE' # error flag
 
2785
            's\x00\x00\x00\x09l5:errore' # error args: ('error',)
 
2786
            'e', # end
 
2787
            output.getvalue())
 
2788
 
2646
2789
 
2647
2790
class StubMediumRequest(object):
2648
2791
    """A stub medium request that tracks the number of times accept_bytes is