~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Sidnei da Silva
  • Date: 2009-05-29 14:19:29 UTC
  • mto: (4531.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 4532.
  • Revision ID: sidnei.da.silva@canonical.com-20090529141929-3heywbvj36po72a5
- Add initial config

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Server-side repository related request implmentations."""
 
18
 
 
19
import bz2
 
20
import os
 
21
import Queue
 
22
import struct
 
23
import sys
 
24
import tarfile
 
25
import tempfile
 
26
import threading
 
27
 
 
28
from bzrlib import (
 
29
    errors,
 
30
    graph,
 
31
    osutils,
 
32
    pack,
 
33
    )
 
34
from bzrlib.bzrdir import BzrDir
 
35
from bzrlib.smart.request import (
 
36
    FailedSmartServerResponse,
 
37
    SmartServerRequest,
 
38
    SuccessfulSmartServerResponse,
 
39
    )
 
40
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
 
41
from bzrlib import revision as _mod_revision
 
42
from bzrlib.util import bencode
 
43
from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
 
44
 
 
45
 
 
46
class SmartServerRepositoryRequest(SmartServerRequest):
 
47
    """Common base class for Repository requests."""
 
48
 
 
49
    def do(self, path, *args):
 
50
        """Execute a repository request.
 
51
 
 
52
        All Repository requests take a path to the repository as their first
 
53
        argument.  The repository must be at the exact path given by the
 
54
        client - no searching is done.
 
55
 
 
56
        The actual logic is delegated to self.do_repository_request.
 
57
 
 
58
        :param client_path: The path for the repository as received from the
 
59
            client.
 
60
        :return: A SmartServerResponse from self.do_repository_request().
 
61
        """
 
62
        transport = self.transport_from_client_path(path)
 
63
        bzrdir = BzrDir.open_from_transport(transport)
 
64
        # Save the repository for use with do_body.
 
65
        self._repository = bzrdir.open_repository()
 
66
        return self.do_repository_request(self._repository, *args)
 
67
 
 
68
    def do_repository_request(self, repository, *args):
 
69
        """Override to provide an implementation for a verb."""
 
70
        # No-op for verbs that take bodies (None as a result indicates a body
 
71
        # is expected)
 
72
        return None
 
73
 
 
74
    def recreate_search(self, repository, search_bytes, discard_excess=False):
 
75
        """Recreate a search from its serialised form.
 
76
 
 
77
        :param discard_excess: If True, and the search refers to data we don't
 
78
            have, just silently accept that fact - the verb calling
 
79
            recreate_search trusts that clients will look for missing things
 
80
            they expected and get it from elsewhere.
 
81
        """
 
82
        lines = search_bytes.split('\n')
 
83
        if lines[0] == 'ancestry-of':
 
84
            heads = lines[1:]
 
85
            search_result = graph.PendingAncestryResult(heads, repository)
 
86
            return search_result, None
 
87
        elif lines[0] == 'search':
 
88
            return self.recreate_search_from_recipe(repository, lines[1:],
 
89
                discard_excess=discard_excess)
 
90
        else:
 
91
            return (None, FailedSmartServerResponse(('BadSearch',)))
 
92
 
 
93
    def recreate_search_from_recipe(self, repository, lines,
 
94
        discard_excess=False):
 
95
        """Recreate a specific revision search (vs a from-tip search).
 
96
 
 
97
        :param discard_excess: If True, and the search refers to data we don't
 
98
            have, just silently accept that fact - the verb calling
 
99
            recreate_search trusts that clients will look for missing things
 
100
            they expected and get it from elsewhere.
 
101
        """
 
102
        start_keys = set(lines[0].split(' '))
 
103
        exclude_keys = set(lines[1].split(' '))
 
104
        revision_count = int(lines[2])
 
105
        repository.lock_read()
 
106
        try:
 
107
            search = repository.get_graph()._make_breadth_first_searcher(
 
108
                start_keys)
 
109
            while True:
 
110
                try:
 
111
                    next_revs = search.next()
 
112
                except StopIteration:
 
113
                    break
 
114
                search.stop_searching_any(exclude_keys.intersection(next_revs))
 
115
            search_result = search.get_result()
 
116
            if (not discard_excess and
 
117
                search_result.get_recipe()[3] != revision_count):
 
118
                # we got back a different amount of data than expected, this
 
119
                # gets reported as NoSuchRevision, because less revisions
 
120
                # indicates missing revisions, and more should never happen as
 
121
                # the excludes list considers ghosts and ensures that ghost
 
122
                # filling races are not a problem.
 
123
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
 
124
            return (search_result, None)
 
125
        finally:
 
126
            repository.unlock()
 
127
 
 
128
 
 
129
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
 
130
    """Calls self.do_readlocked_repository_request."""
 
131
 
 
132
    def do_repository_request(self, repository, *args):
 
133
        """Read lock a repository for do_readlocked_repository_request."""
 
134
        repository.lock_read()
 
135
        try:
 
136
            return self.do_readlocked_repository_request(repository, *args)
 
137
        finally:
 
138
            repository.unlock()
 
139
 
 
140
 
 
141
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
 
142
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
 
143
 
 
144
    no_extra_results = False
 
145
 
 
146
    def do_repository_request(self, repository, *revision_ids):
 
147
        """Get parent details for some revisions.
 
148
 
 
149
        All the parents for revision_ids are returned. Additionally up to 64KB
 
150
        of additional parent data found by performing a breadth first search
 
151
        from revision_ids is returned. The verb takes a body containing the
 
152
        current search state, see do_body for details.
 
153
 
 
154
        If 'include-missing:' is in revision_ids, ghosts encountered in the
 
155
        graph traversal for getting parent data are included in the result with
 
156
        a prefix of 'missing:'.
 
157
 
 
158
        :param repository: The repository to query in.
 
159
        :param revision_ids: The utf8 encoded revision_id to answer for.
 
160
        """
 
161
        self._revision_ids = revision_ids
 
162
        return None # Signal that we want a body.
 
163
 
 
164
    def do_body(self, body_bytes):
 
165
        """Process the current search state and perform the parent lookup.
 
166
 
 
167
        :return: A smart server response where the body contains an utf8
 
168
            encoded flattened list of the parents of the revisions (the same
 
169
            format as Repository.get_revision_graph) which has been bz2
 
170
            compressed.
 
171
        """
 
172
        repository = self._repository
 
173
        repository.lock_read()
 
174
        try:
 
175
            return self._do_repository_request(body_bytes)
 
176
        finally:
 
177
            repository.unlock()
 
178
 
 
179
    def _do_repository_request(self, body_bytes):
 
180
        repository = self._repository
 
181
        revision_ids = set(self._revision_ids)
 
182
        include_missing = 'include-missing:' in revision_ids
 
183
        if include_missing:
 
184
            revision_ids.remove('include-missing:')
 
185
        body_lines = body_bytes.split('\n')
 
186
        search_result, error = self.recreate_search_from_recipe(
 
187
            repository, body_lines)
 
188
        if error is not None:
 
189
            return error
 
190
        # TODO might be nice to start up the search again; but thats not
 
191
        # written or tested yet.
 
192
        client_seen_revs = set(search_result.get_keys())
 
193
        # Always include the requested ids.
 
194
        client_seen_revs.difference_update(revision_ids)
 
195
        lines = []
 
196
        repo_graph = repository.get_graph()
 
197
        result = {}
 
198
        queried_revs = set()
 
199
        size_so_far = 0
 
200
        next_revs = revision_ids
 
201
        first_loop_done = False
 
202
        while next_revs:
 
203
            queried_revs.update(next_revs)
 
204
            parent_map = repo_graph.get_parent_map(next_revs)
 
205
            current_revs = next_revs
 
206
            next_revs = set()
 
207
            for revision_id in current_revs:
 
208
                missing_rev = False
 
209
                parents = parent_map.get(revision_id)
 
210
                if parents is not None:
 
211
                    # adjust for the wire
 
212
                    if parents == (_mod_revision.NULL_REVISION,):
 
213
                        parents = ()
 
214
                    # prepare the next query
 
215
                    next_revs.update(parents)
 
216
                    encoded_id = revision_id
 
217
                else:
 
218
                    missing_rev = True
 
219
                    encoded_id = "missing:" + revision_id
 
220
                    parents = []
 
221
                if (revision_id not in client_seen_revs and
 
222
                    (not missing_rev or include_missing)):
 
223
                    # Client does not have this revision, give it to it.
 
224
                    # add parents to the result
 
225
                    result[encoded_id] = parents
 
226
                    # Approximate the serialized cost of this revision_id.
 
227
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
 
228
            # get all the directly asked for parents, and then flesh out to
 
229
            # 64K (compressed) or so. We do one level of depth at a time to
 
230
            # stay in sync with the client. The 250000 magic number is
 
231
            # estimated compression ratio taken from bzr.dev itself.
 
232
            if self.no_extra_results or (
 
233
                first_loop_done and size_so_far > 250000):
 
234
                next_revs = set()
 
235
                break
 
236
            # don't query things we've already queried
 
237
            next_revs.difference_update(queried_revs)
 
238
            first_loop_done = True
 
239
 
 
240
        # sorting trivially puts lexographically similar revision ids together.
 
241
        # Compression FTW.
 
242
        for revision, parents in sorted(result.items()):
 
243
            lines.append(' '.join((revision, ) + tuple(parents)))
 
244
 
 
245
        return SuccessfulSmartServerResponse(
 
246
            ('ok', ), bz2.compress('\n'.join(lines)))
 
247
 
 
248
 
 
249
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
 
250
 
 
251
    def do_readlocked_repository_request(self, repository, revision_id):
 
252
        """Return the result of repository.get_revision_graph(revision_id).
 
253
 
 
254
        Deprecated as of bzr 1.4, but supported for older clients.
 
255
 
 
256
        :param repository: The repository to query in.
 
257
        :param revision_id: The utf8 encoded revision_id to get a graph from.
 
258
        :return: A smart server response where the body contains an utf8
 
259
            encoded flattened list of the revision graph.
 
260
        """
 
261
        if not revision_id:
 
262
            revision_id = None
 
263
 
 
264
        lines = []
 
265
        graph = repository.get_graph()
 
266
        if revision_id:
 
267
            search_ids = [revision_id]
 
268
        else:
 
269
            search_ids = repository.all_revision_ids()
 
270
        search = graph._make_breadth_first_searcher(search_ids)
 
271
        transitive_ids = set()
 
272
        map(transitive_ids.update, list(search))
 
273
        parent_map = graph.get_parent_map(transitive_ids)
 
274
        revision_graph = _strip_NULL_ghosts(parent_map)
 
275
        if revision_id and revision_id not in revision_graph:
 
276
            # Note that we return an empty body, rather than omitting the body.
 
277
            # This way the client knows that it can always expect to find a body
 
278
            # in the response for this method, even in the error case.
 
279
            return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
 
280
 
 
281
        for revision, parents in revision_graph.items():
 
282
            lines.append(' '.join((revision, ) + tuple(parents)))
 
283
 
 
284
        return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
 
285
 
 
286
 
 
287
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
 
288
 
 
289
    def do_repository_request(self, repository, revision_id):
 
290
        """Return ok if a specific revision is in the repository at path.
 
291
 
 
292
        :param repository: The repository to query in.
 
293
        :param revision_id: The utf8 encoded revision_id to lookup.
 
294
        :return: A smart server response of ('ok', ) if the revision is
 
295
            present.
 
296
        """
 
297
        if repository.has_revision(revision_id):
 
298
            return SuccessfulSmartServerResponse(('yes', ))
 
299
        else:
 
300
            return SuccessfulSmartServerResponse(('no', ))
 
301
 
 
302
 
 
303
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
 
304
 
 
305
    def do_repository_request(self, repository, revid, committers):
 
306
        """Return the result of repository.gather_stats().
 
307
 
 
308
        :param repository: The repository to query in.
 
309
        :param revid: utf8 encoded rev id or an empty string to indicate None
 
310
        :param committers: 'yes' or 'no'.
 
311
 
 
312
        :return: A SmartServerResponse ('ok',), a encoded body looking like
 
313
              committers: 1
 
314
              firstrev: 1234.230 0
 
315
              latestrev: 345.700 3600
 
316
              revisions: 2
 
317
 
 
318
              But containing only fields returned by the gather_stats() call
 
319
        """
 
320
        if revid == '':
 
321
            decoded_revision_id = None
 
322
        else:
 
323
            decoded_revision_id = revid
 
324
        if committers == 'yes':
 
325
            decoded_committers = True
 
326
        else:
 
327
            decoded_committers = None
 
328
        stats = repository.gather_stats(decoded_revision_id, decoded_committers)
 
329
 
 
330
        body = ''
 
331
        if stats.has_key('committers'):
 
332
            body += 'committers: %d\n' % stats['committers']
 
333
        if stats.has_key('firstrev'):
 
334
            body += 'firstrev: %.3f %d\n' % stats['firstrev']
 
335
        if stats.has_key('latestrev'):
 
336
             body += 'latestrev: %.3f %d\n' % stats['latestrev']
 
337
        if stats.has_key('revisions'):
 
338
            body += 'revisions: %d\n' % stats['revisions']
 
339
        if stats.has_key('size'):
 
340
            body += 'size: %d\n' % stats['size']
 
341
 
 
342
        return SuccessfulSmartServerResponse(('ok', ), body)
 
343
 
 
344
 
 
345
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
 
346
 
 
347
    def do_repository_request(self, repository):
 
348
        """Return the result of repository.is_shared().
 
349
 
 
350
        :param repository: The repository to query in.
 
351
        :return: A smart server response of ('yes', ) if the repository is
 
352
            shared, and ('no', ) if it is not.
 
353
        """
 
354
        if repository.is_shared():
 
355
            return SuccessfulSmartServerResponse(('yes', ))
 
356
        else:
 
357
            return SuccessfulSmartServerResponse(('no', ))
 
358
 
 
359
 
 
360
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
 
361
 
 
362
    def do_repository_request(self, repository, token=''):
 
363
        # XXX: this probably should not have a token.
 
364
        if token == '':
 
365
            token = None
 
366
        try:
 
367
            token = repository.lock_write(token=token)
 
368
        except errors.LockContention, e:
 
369
            return FailedSmartServerResponse(('LockContention',))
 
370
        except errors.UnlockableTransport:
 
371
            return FailedSmartServerResponse(('UnlockableTransport',))
 
372
        except errors.LockFailed, e:
 
373
            return FailedSmartServerResponse(('LockFailed',
 
374
                str(e.lock), str(e.why)))
 
375
        if token is not None:
 
376
            repository.leave_lock_in_place()
 
377
        repository.unlock()
 
378
        if token is None:
 
379
            token = ''
 
380
        return SuccessfulSmartServerResponse(('ok', token))
 
381
 
 
382
 
 
383
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
 
384
 
 
385
    def do_repository_request(self, repository, to_network_name):
 
386
        """Get a stream for inserting into a to_format repository.
 
387
 
 
388
        :param repository: The repository to stream from.
 
389
        :param to_network_name: The network name of the format of the target
 
390
            repository.
 
391
        """
 
392
        self._to_format = network_format_registry.get(to_network_name)
 
393
        return None # Signal that we want a body.
 
394
 
 
395
    def do_body(self, body_bytes):
 
396
        repository = self._repository
 
397
        repository.lock_read()
 
398
        try:
 
399
            search_result, error = self.recreate_search(repository, body_bytes,
 
400
                discard_excess=True)
 
401
            if error is not None:
 
402
                repository.unlock()
 
403
                return error
 
404
            source = repository._get_source(self._to_format)
 
405
            stream = source.get_stream(search_result)
 
406
        except Exception:
 
407
            exc_info = sys.exc_info()
 
408
            try:
 
409
                # On non-error, unlocking is done by the body stream handler.
 
410
                repository.unlock()
 
411
            finally:
 
412
                raise exc_info[0], exc_info[1], exc_info[2]
 
413
        return SuccessfulSmartServerResponse(('ok',),
 
414
            body_stream=self.body_stream(stream, repository))
 
415
 
 
416
    def body_stream(self, stream, repository):
 
417
        byte_stream = _stream_to_byte_stream(stream, repository._format)
 
418
        try:
 
419
            for bytes in byte_stream:
 
420
                yield bytes
 
421
        except errors.RevisionNotPresent, e:
 
422
            # This shouldn't be able to happen, but as we don't buffer
 
423
            # everything it can in theory happen.
 
424
            repository.unlock()
 
425
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
 
426
        else:
 
427
            repository.unlock()
 
428
 
 
429
 
 
430
def _stream_to_byte_stream(stream, src_format):
 
431
    """Convert a record stream to a self delimited byte stream."""
 
432
    pack_writer = pack.ContainerSerialiser()
 
433
    yield pack_writer.begin()
 
434
    yield pack_writer.bytes_record(src_format.network_name(), '')
 
435
    for substream_type, substream in stream:
 
436
        for record in substream:
 
437
            if record.storage_kind in ('chunked', 'fulltext'):
 
438
                serialised = record_to_fulltext_bytes(record)
 
439
            else:
 
440
                serialised = record.get_bytes_as(record.storage_kind)
 
441
            if serialised:
 
442
                # Some streams embed the whole stream into the wire
 
443
                # representation of the first record, which means that
 
444
                # later records have no wire representation: we skip them.
 
445
                yield pack_writer.bytes_record(serialised, [(substream_type,)])
 
446
    yield pack_writer.end()
 
447
 
 
448
 
 
449
def _byte_stream_to_stream(byte_stream):
 
450
    """Convert a byte stream into a format and a stream.
 
451
 
 
452
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
 
453
    :return: (RepositoryFormat, stream_generator)
 
454
    """
 
455
    stream_decoder = pack.ContainerPushParser()
 
456
    def record_stream():
 
457
        """Closure to return the substreams."""
 
458
        # May have fully parsed records already.
 
459
        for record in stream_decoder.read_pending_records():
 
460
            record_names, record_bytes = record
 
461
            record_name, = record_names
 
462
            substream_type = record_name[0]
 
463
            substream = NetworkRecordStream([record_bytes])
 
464
            yield substream_type, substream.read()
 
465
        for bytes in byte_stream:
 
466
            stream_decoder.accept_bytes(bytes)
 
467
            for record in stream_decoder.read_pending_records():
 
468
                record_names, record_bytes = record
 
469
                record_name, = record_names
 
470
                substream_type = record_name[0]
 
471
                substream = NetworkRecordStream([record_bytes])
 
472
                yield substream_type, substream.read()
 
473
    for bytes in byte_stream:
 
474
        stream_decoder.accept_bytes(bytes)
 
475
        for record in stream_decoder.read_pending_records(max=1):
 
476
            record_names, src_format_name = record
 
477
            src_format = network_format_registry.get(src_format_name)
 
478
            return src_format, record_stream()
 
479
 
 
480
 
 
481
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
 
482
 
 
483
    def do_repository_request(self, repository, token):
 
484
        try:
 
485
            repository.lock_write(token=token)
 
486
        except errors.TokenMismatch, e:
 
487
            return FailedSmartServerResponse(('TokenMismatch',))
 
488
        repository.dont_leave_lock_in_place()
 
489
        repository.unlock()
 
490
        return SuccessfulSmartServerResponse(('ok',))
 
491
 
 
492
 
 
493
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
 
494
 
 
495
    def do_repository_request(self, repository, str_bool_new_value):
 
496
        if str_bool_new_value == 'True':
 
497
            new_value = True
 
498
        else:
 
499
            new_value = False
 
500
        repository.set_make_working_trees(new_value)
 
501
        return SuccessfulSmartServerResponse(('ok',))
 
502
 
 
503
 
 
504
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
 
505
    """Get the raw repository files as a tarball.
 
506
 
 
507
    The returned tarball contains a .bzr control directory which in turn
 
508
    contains a repository.
 
509
 
 
510
    This takes one parameter, compression, which currently must be
 
511
    "", "gz", or "bz2".
 
512
 
 
513
    This is used to implement the Repository.copy_content_into operation.
 
514
    """
 
515
 
 
516
    def do_repository_request(self, repository, compression):
 
517
        tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
 
518
        try:
 
519
            controldir_name = tmp_dirname + '/.bzr'
 
520
            return self._tarfile_response(controldir_name, compression)
 
521
        finally:
 
522
            osutils.rmtree(tmp_dirname)
 
523
 
 
524
    def _copy_to_tempdir(self, from_repo):
 
525
        tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
 
526
        tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
 
527
        tmp_repo = from_repo._format.initialize(tmp_bzrdir)
 
528
        from_repo.copy_content_into(tmp_repo)
 
529
        return tmp_dirname, tmp_repo
 
530
 
 
531
    def _tarfile_response(self, tmp_dirname, compression):
 
532
        temp = tempfile.NamedTemporaryFile()
 
533
        try:
 
534
            self._tarball_of_dir(tmp_dirname, compression, temp.file)
 
535
            # all finished; write the tempfile out to the network
 
536
            temp.seek(0)
 
537
            return SuccessfulSmartServerResponse(('ok',), temp.read())
 
538
            # FIXME: Don't read the whole thing into memory here; rather stream
 
539
            # it out from the file onto the network. mbp 20070411
 
540
        finally:
 
541
            temp.close()
 
542
 
 
543
    def _tarball_of_dir(self, dirname, compression, ofile):
 
544
        filename = os.path.basename(ofile.name)
 
545
        tarball = tarfile.open(fileobj=ofile, name=filename,
 
546
            mode='w|' + compression)
 
547
        try:
 
548
            # The tarball module only accepts ascii names, and (i guess)
 
549
            # packs them with their 8bit names.  We know all the files
 
550
            # within the repository have ASCII names so the should be safe
 
551
            # to pack in.
 
552
            dirname = dirname.encode(sys.getfilesystemencoding())
 
553
            # python's tarball module includes the whole path by default so
 
554
            # override it
 
555
            if not dirname.endswith('.bzr'):
 
556
                raise ValueError(dirname)
 
557
            tarball.add(dirname, '.bzr') # recursive by default
 
558
        finally:
 
559
            tarball.close()
 
560
 
 
561
 
 
562
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
 
563
    """Insert a record stream from a RemoteSink into a repository.
 
564
 
 
565
    This gets bytes pushed to it by the network infrastructure and turns that
 
566
    into a bytes iterator using a thread. That is then processed by
 
567
    _byte_stream_to_stream.
 
568
 
 
569
    New in 1.14.
 
570
    """
 
571
 
 
572
    def do_repository_request(self, repository, resume_tokens, lock_token):
 
573
        """StreamSink.insert_stream for a remote repository."""
 
574
        repository.lock_write(token=lock_token)
 
575
        self.do_insert_stream_request(repository, resume_tokens)
 
576
 
 
577
    def do_insert_stream_request(self, repository, resume_tokens):
 
578
        tokens = [token for token in resume_tokens.split(' ') if token]
 
579
        self.tokens = tokens
 
580
        self.repository = repository
 
581
        self.queue = Queue.Queue()
 
582
        self.insert_thread = threading.Thread(target=self._inserter_thread)
 
583
        self.insert_thread.start()
 
584
 
 
585
    def do_chunk(self, body_stream_chunk):
 
586
        self.queue.put(body_stream_chunk)
 
587
 
 
588
    def _inserter_thread(self):
 
589
        try:
 
590
            src_format, stream = _byte_stream_to_stream(
 
591
                self.blocking_byte_stream())
 
592
            self.insert_result = self.repository._get_sink().insert_stream(
 
593
                stream, src_format, self.tokens)
 
594
            self.insert_ok = True
 
595
        except:
 
596
            self.insert_exception = sys.exc_info()
 
597
            self.insert_ok = False
 
598
 
 
599
    def blocking_byte_stream(self):
 
600
        while True:
 
601
            bytes = self.queue.get()
 
602
            if bytes is StopIteration:
 
603
                return
 
604
            else:
 
605
                yield bytes
 
606
 
 
607
    def do_end(self):
 
608
        self.queue.put(StopIteration)
 
609
        if self.insert_thread is not None:
 
610
            self.insert_thread.join()
 
611
        if not self.insert_ok:
 
612
            exc_info = self.insert_exception
 
613
            raise exc_info[0], exc_info[1], exc_info[2]
 
614
        write_group_tokens, missing_keys = self.insert_result
 
615
        if write_group_tokens or missing_keys:
 
616
            # bzip needed? missing keys should typically be a small set.
 
617
            # Should this be a streaming body response ?
 
618
            missing_keys = sorted(missing_keys)
 
619
            bytes = bencode.bencode((write_group_tokens, missing_keys))
 
620
            self.repository.unlock()
 
621
            return SuccessfulSmartServerResponse(('missing-basis', bytes))
 
622
        else:
 
623
            self.repository.unlock()
 
624
            return SuccessfulSmartServerResponse(('ok', ))
 
625
 
 
626
 
 
627
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
 
628
    """Insert a record stream from a RemoteSink into an unlocked repository.
 
629
 
 
630
    This is the same as SmartServerRepositoryInsertStreamLocked, except it
 
631
    takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
 
632
    like pack format) repository.
 
633
 
 
634
    New in 1.13.
 
635
    """
 
636
 
 
637
    def do_repository_request(self, repository, resume_tokens):
 
638
        """StreamSink.insert_stream for a remote repository."""
 
639
        repository.lock_write()
 
640
        self.do_insert_stream_request(repository, resume_tokens)
 
641
 
 
642