40
class SmartClientMediumRequest(object):
41
"""A request on a SmartClientMedium.
43
Each request allows bytes to be provided to it via accept_bytes, and then
44
the response bytes to be read via read_bytes.
47
request.accept_bytes('123')
48
request.finished_writing()
49
result = request.read_bytes(3)
50
request.finished_reading()
52
It is up to the individual SmartClientMedium whether multiple concurrent
53
requests can exist. See SmartClientMedium.get_request to obtain instances
54
of SmartClientMediumRequest, and the concrete Medium you are using for
55
details on concurrency and pipelining.
58
def __init__(self, medium):
59
"""Construct a SmartClientMediumRequest for the medium medium."""
61
# we track state by constants - we may want to use the same
62
# pattern as BodyReader if it gets more complex.
63
# valid states are: "writing", "reading", "done"
64
self._state = "writing"
66
def accept_bytes(self, bytes):
67
"""Accept bytes for inclusion in this request.
69
This method may not be be called after finished_writing() has been
70
called. It depends upon the Medium whether or not the bytes will be
71
immediately transmitted. Message based Mediums will tend to buffer the
72
bytes until finished_writing() is called.
74
:param bytes: A bytestring.
76
if self._state != "writing":
77
raise errors.WritingCompleted(self)
78
self._accept_bytes(bytes)
80
def _accept_bytes(self, bytes):
81
"""Helper for accept_bytes.
83
Accept_bytes checks the state of the request to determing if bytes
84
should be accepted. After that it hands off to _accept_bytes to do the
87
raise NotImplementedError(self._accept_bytes)
89
def finished_reading(self):
90
"""Inform the request that all desired data has been read.
92
This will remove the request from the pipeline for its medium (if the
93
medium supports pipelining) and any further calls to methods on the
94
request will raise ReadingCompleted.
96
if self._state == "writing":
97
raise errors.WritingNotComplete(self)
98
if self._state != "reading":
99
raise errors.ReadingCompleted(self)
101
self._finished_reading()
103
def _finished_reading(self):
104
"""Helper for finished_reading.
106
finished_reading checks the state of the request to determine if
107
finished_reading is allowed, and if it is hands off to _finished_reading
108
to perform the action.
110
raise NotImplementedError(self._finished_reading)
112
def finished_writing(self):
113
"""Finish the writing phase of this request.
115
This will flush all pending data for this request along the medium.
116
After calling finished_writing, you may not call accept_bytes anymore.
118
if self._state != "writing":
119
raise errors.WritingCompleted(self)
120
self._state = "reading"
121
self._finished_writing()
123
def _finished_writing(self):
124
"""Helper for finished_writing.
126
finished_writing checks the state of the request to determine if
127
finished_writing is allowed, and if it is hands off to _finished_writing
128
to perform the action.
130
raise NotImplementedError(self._finished_writing)
132
def read_bytes(self, count):
133
"""Read bytes from this requests response.
135
This method will block and wait for count bytes to be read. It may not
136
be invoked until finished_writing() has been called - this is to ensure
137
a message-based approach to requests, for compatability with message
138
based mediums like HTTP.
140
if self._state == "writing":
141
raise errors.WritingNotComplete(self)
142
if self._state != "reading":
143
raise errors.ReadingCompleted(self)
144
return self._read_bytes(count)
146
def _read_bytes(self, count):
147
"""Helper for read_bytes.
149
read_bytes checks the state of the request to determing if bytes
150
should be read. After that it hands off to _read_bytes to do the
153
raise NotImplementedError(self._read_bytes)
156
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
157
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
159
def __init__(self, medium):
160
SmartClientMediumRequest.__init__(self, medium)
161
# check that we are safe concurrency wise. If some streams start
162
# allowing concurrent requests - i.e. via multiplexing - then this
163
# assert should be moved to SmartClientStreamMedium.get_request,
164
# and the setting/unsetting of _current_request likewise moved into
165
# that class : but its unneeded overhead for now. RBC 20060922
166
if self._medium._current_request is not None:
167
raise errors.TooManyConcurrentRequests(self._medium)
168
self._medium._current_request = self
170
def _accept_bytes(self, bytes):
171
"""See SmartClientMediumRequest._accept_bytes.
173
This forwards to self._medium._accept_bytes because we are operating
174
on the mediums stream.
176
self._medium._accept_bytes(bytes)
178
def _finished_reading(self):
179
"""See SmartClientMediumRequest._finished_reading.
181
This clears the _current_request on self._medium to allow a new
182
request to be created.
184
assert self._medium._current_request is self
185
self._medium._current_request = None
187
def _finished_writing(self):
188
"""See SmartClientMediumRequest._finished_writing.
190
This invokes self._medium._flush to ensure all bytes are transmitted.
192
self._medium._flush()
194
def _read_bytes(self, count):
195
"""See SmartClientMediumRequest._read_bytes.
197
This forwards to self._medium._read_bytes because we are operating
198
on the mediums stream.
200
return self._medium._read_bytes(count)
203
40
class SmartServerStreamMedium(object):
204
41
"""Handles smart commands coming over a stream.
336
173
self._out.write(bytes)
176
class SmartClientMediumRequest(object):
177
"""A request on a SmartClientMedium.
179
Each request allows bytes to be provided to it via accept_bytes, and then
180
the response bytes to be read via read_bytes.
183
request.accept_bytes('123')
184
request.finished_writing()
185
result = request.read_bytes(3)
186
request.finished_reading()
188
It is up to the individual SmartClientMedium whether multiple concurrent
189
requests can exist. See SmartClientMedium.get_request to obtain instances
190
of SmartClientMediumRequest, and the concrete Medium you are using for
191
details on concurrency and pipelining.
194
def __init__(self, medium):
195
"""Construct a SmartClientMediumRequest for the medium medium."""
196
self._medium = medium
197
# we track state by constants - we may want to use the same
198
# pattern as BodyReader if it gets more complex.
199
# valid states are: "writing", "reading", "done"
200
self._state = "writing"
202
def accept_bytes(self, bytes):
203
"""Accept bytes for inclusion in this request.
205
This method may not be be called after finished_writing() has been
206
called. It depends upon the Medium whether or not the bytes will be
207
immediately transmitted. Message based Mediums will tend to buffer the
208
bytes until finished_writing() is called.
210
:param bytes: A bytestring.
212
if self._state != "writing":
213
raise errors.WritingCompleted(self)
214
self._accept_bytes(bytes)
216
def _accept_bytes(self, bytes):
217
"""Helper for accept_bytes.
219
Accept_bytes checks the state of the request to determing if bytes
220
should be accepted. After that it hands off to _accept_bytes to do the
223
raise NotImplementedError(self._accept_bytes)
225
def finished_reading(self):
226
"""Inform the request that all desired data has been read.
228
This will remove the request from the pipeline for its medium (if the
229
medium supports pipelining) and any further calls to methods on the
230
request will raise ReadingCompleted.
232
if self._state == "writing":
233
raise errors.WritingNotComplete(self)
234
if self._state != "reading":
235
raise errors.ReadingCompleted(self)
237
self._finished_reading()
239
def _finished_reading(self):
240
"""Helper for finished_reading.
242
finished_reading checks the state of the request to determine if
243
finished_reading is allowed, and if it is hands off to _finished_reading
244
to perform the action.
246
raise NotImplementedError(self._finished_reading)
248
def finished_writing(self):
249
"""Finish the writing phase of this request.
251
This will flush all pending data for this request along the medium.
252
After calling finished_writing, you may not call accept_bytes anymore.
254
if self._state != "writing":
255
raise errors.WritingCompleted(self)
256
self._state = "reading"
257
self._finished_writing()
259
def _finished_writing(self):
260
"""Helper for finished_writing.
262
finished_writing checks the state of the request to determine if
263
finished_writing is allowed, and if it is hands off to _finished_writing
264
to perform the action.
266
raise NotImplementedError(self._finished_writing)
268
def read_bytes(self, count):
269
"""Read bytes from this requests response.
271
This method will block and wait for count bytes to be read. It may not
272
be invoked until finished_writing() has been called - this is to ensure
273
a message-based approach to requests, for compatability with message
274
based mediums like HTTP.
276
if self._state == "writing":
277
raise errors.WritingNotComplete(self)
278
if self._state != "reading":
279
raise errors.ReadingCompleted(self)
280
return self._read_bytes(count)
282
def _read_bytes(self, count):
283
"""Helper for read_bytes.
285
read_bytes checks the state of the request to determing if bytes
286
should be read. After that it hands off to _read_bytes to do the
289
raise NotImplementedError(self._read_bytes)
339
292
class SmartClientMedium(object):
340
293
"""Smart client is a medium for sending smart protocol requests over."""
523
476
raise errors.MediumNotConnected(self)
524
477
return self._socket.recv(count)
480
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
481
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
483
def __init__(self, medium):
484
SmartClientMediumRequest.__init__(self, medium)
485
# check that we are safe concurrency wise. If some streams start
486
# allowing concurrent requests - i.e. via multiplexing - then this
487
# assert should be moved to SmartClientStreamMedium.get_request,
488
# and the setting/unsetting of _current_request likewise moved into
489
# that class : but its unneeded overhead for now. RBC 20060922
490
if self._medium._current_request is not None:
491
raise errors.TooManyConcurrentRequests(self._medium)
492
self._medium._current_request = self
494
def _accept_bytes(self, bytes):
495
"""See SmartClientMediumRequest._accept_bytes.
497
This forwards to self._medium._accept_bytes because we are operating
498
on the mediums stream.
500
self._medium._accept_bytes(bytes)
502
def _finished_reading(self):
503
"""See SmartClientMediumRequest._finished_reading.
505
This clears the _current_request on self._medium to allow a new
506
request to be created.
508
assert self._medium._current_request is self
509
self._medium._current_request = None
511
def _finished_writing(self):
512
"""See SmartClientMediumRequest._finished_writing.
514
This invokes self._medium._flush to ensure all bytes are transmitted.
516
self._medium._flush()
518
def _read_bytes(self, count):
519
"""See SmartClientMediumRequest._read_bytes.
521
This forwards to self._medium._read_bytes because we are operating
522
on the mediums stream.
524
return self._medium._read_bytes(count)