~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Martin Pool
  • Date: 2005-05-17 07:51:51 UTC
  • Revision ID: mbp@sourcefrog.net-20050517075151-b64853a6a38a6225
- export bzrlib.find_branch

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006, 2007 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Server-side repository related request implmentations."""
18
 
 
19
 
import bz2
20
 
import os
21
 
import Queue
22
 
import struct
23
 
import sys
24
 
import tarfile
25
 
import tempfile
26
 
import threading
27
 
 
28
 
from bzrlib import (
29
 
    errors,
30
 
    graph,
31
 
    osutils,
32
 
    pack,
33
 
    )
34
 
from bzrlib.bzrdir import BzrDir
35
 
from bzrlib.smart.request import (
36
 
    FailedSmartServerResponse,
37
 
    SmartServerRequest,
38
 
    SuccessfulSmartServerResponse,
39
 
    )
40
 
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
41
 
from bzrlib import revision as _mod_revision
42
 
from bzrlib.util import bencode
43
 
from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
44
 
 
45
 
 
46
 
class SmartServerRepositoryRequest(SmartServerRequest):
47
 
    """Common base class for Repository requests."""
48
 
 
49
 
    def do(self, path, *args):
50
 
        """Execute a repository request.
51
 
 
52
 
        All Repository requests take a path to the repository as their first
53
 
        argument.  The repository must be at the exact path given by the
54
 
        client - no searching is done.
55
 
 
56
 
        The actual logic is delegated to self.do_repository_request.
57
 
 
58
 
        :param client_path: The path for the repository as received from the
59
 
            client.
60
 
        :return: A SmartServerResponse from self.do_repository_request().
61
 
        """
62
 
        transport = self.transport_from_client_path(path)
63
 
        bzrdir = BzrDir.open_from_transport(transport)
64
 
        # Save the repository for use with do_body.
65
 
        self._repository = bzrdir.open_repository()
66
 
        return self.do_repository_request(self._repository, *args)
67
 
 
68
 
    def do_repository_request(self, repository, *args):
69
 
        """Override to provide an implementation for a verb."""
70
 
        # No-op for verbs that take bodies (None as a result indicates a body
71
 
        # is expected)
72
 
        return None
73
 
 
74
 
    def recreate_search(self, repository, search_bytes):
75
 
        lines = search_bytes.split('\n')
76
 
        if lines[0] == 'ancestry-of':
77
 
            heads = lines[1:]
78
 
            search_result = graph.PendingAncestryResult(heads, repository)
79
 
            return search_result, None
80
 
        elif lines[0] == 'search':
81
 
            return self.recreate_search_from_recipe(repository, lines[1:])
82
 
        else:
83
 
            return (None, FailedSmartServerResponse(('BadSearch',)))
84
 
 
85
 
    def recreate_search_from_recipe(self, repository, lines):
86
 
        start_keys = set(lines[0].split(' '))
87
 
        exclude_keys = set(lines[1].split(' '))
88
 
        revision_count = int(lines[2])
89
 
        repository.lock_read()
90
 
        try:
91
 
            search = repository.get_graph()._make_breadth_first_searcher(
92
 
                start_keys)
93
 
            while True:
94
 
                try:
95
 
                    next_revs = search.next()
96
 
                except StopIteration:
97
 
                    break
98
 
                search.stop_searching_any(exclude_keys.intersection(next_revs))
99
 
            search_result = search.get_result()
100
 
            if search_result.get_recipe()[3] != revision_count:
101
 
                # we got back a different amount of data than expected, this
102
 
                # gets reported as NoSuchRevision, because less revisions
103
 
                # indicates missing revisions, and more should never happen as
104
 
                # the excludes list considers ghosts and ensures that ghost
105
 
                # filling races are not a problem.
106
 
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
107
 
            return (search_result, None)
108
 
        finally:
109
 
            repository.unlock()
110
 
 
111
 
 
112
 
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
113
 
    """Calls self.do_readlocked_repository_request."""
114
 
 
115
 
    def do_repository_request(self, repository, *args):
116
 
        """Read lock a repository for do_readlocked_repository_request."""
117
 
        repository.lock_read()
118
 
        try:
119
 
            return self.do_readlocked_repository_request(repository, *args)
120
 
        finally:
121
 
            repository.unlock()
122
 
 
123
 
 
124
 
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
125
 
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
126
 
 
127
 
    no_extra_results = False
128
 
 
129
 
    def do_repository_request(self, repository, *revision_ids):
130
 
        """Get parent details for some revisions.
131
 
 
132
 
        All the parents for revision_ids are returned. Additionally up to 64KB
133
 
        of additional parent data found by performing a breadth first search
134
 
        from revision_ids is returned. The verb takes a body containing the
135
 
        current search state, see do_body for details.
136
 
 
137
 
        If 'include-missing:' is in revision_ids, ghosts encountered in the
138
 
        graph traversal for getting parent data are included in the result with
139
 
        a prefix of 'missing:'.
140
 
 
141
 
        :param repository: The repository to query in.
142
 
        :param revision_ids: The utf8 encoded revision_id to answer for.
143
 
        """
144
 
        self._revision_ids = revision_ids
145
 
        return None # Signal that we want a body.
146
 
 
147
 
    def do_body(self, body_bytes):
148
 
        """Process the current search state and perform the parent lookup.
149
 
 
150
 
        :return: A smart server response where the body contains an utf8
151
 
            encoded flattened list of the parents of the revisions (the same
152
 
            format as Repository.get_revision_graph) which has been bz2
153
 
            compressed.
154
 
        """
155
 
        repository = self._repository
156
 
        repository.lock_read()
157
 
        try:
158
 
            return self._do_repository_request(body_bytes)
159
 
        finally:
160
 
            repository.unlock()
161
 
 
162
 
    def _do_repository_request(self, body_bytes):
163
 
        repository = self._repository
164
 
        revision_ids = set(self._revision_ids)
165
 
        include_missing = 'include-missing:' in revision_ids
166
 
        if include_missing:
167
 
            revision_ids.remove('include-missing:')
168
 
        body_lines = body_bytes.split('\n')
169
 
        search_result, error = self.recreate_search_from_recipe(
170
 
            repository, body_lines)
171
 
        if error is not None:
172
 
            return error
173
 
        # TODO might be nice to start up the search again; but thats not
174
 
        # written or tested yet.
175
 
        client_seen_revs = set(search_result.get_keys())
176
 
        # Always include the requested ids.
177
 
        client_seen_revs.difference_update(revision_ids)
178
 
        lines = []
179
 
        repo_graph = repository.get_graph()
180
 
        result = {}
181
 
        queried_revs = set()
182
 
        size_so_far = 0
183
 
        next_revs = revision_ids
184
 
        first_loop_done = False
185
 
        while next_revs:
186
 
            queried_revs.update(next_revs)
187
 
            parent_map = repo_graph.get_parent_map(next_revs)
188
 
            current_revs = next_revs
189
 
            next_revs = set()
190
 
            for revision_id in current_revs:
191
 
                missing_rev = False
192
 
                parents = parent_map.get(revision_id)
193
 
                if parents is not None:
194
 
                    # adjust for the wire
195
 
                    if parents == (_mod_revision.NULL_REVISION,):
196
 
                        parents = ()
197
 
                    # prepare the next query
198
 
                    next_revs.update(parents)
199
 
                    encoded_id = revision_id
200
 
                else:
201
 
                    missing_rev = True
202
 
                    encoded_id = "missing:" + revision_id
203
 
                    parents = []
204
 
                if (revision_id not in client_seen_revs and
205
 
                    (not missing_rev or include_missing)):
206
 
                    # Client does not have this revision, give it to it.
207
 
                    # add parents to the result
208
 
                    result[encoded_id] = parents
209
 
                    # Approximate the serialized cost of this revision_id.
210
 
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
211
 
            # get all the directly asked for parents, and then flesh out to
212
 
            # 64K (compressed) or so. We do one level of depth at a time to
213
 
            # stay in sync with the client. The 250000 magic number is
214
 
            # estimated compression ratio taken from bzr.dev itself.
215
 
            if self.no_extra_results or (
216
 
                first_loop_done and size_so_far > 250000):
217
 
                next_revs = set()
218
 
                break
219
 
            # don't query things we've already queried
220
 
            next_revs.difference_update(queried_revs)
221
 
            first_loop_done = True
222
 
 
223
 
        # sorting trivially puts lexographically similar revision ids together.
224
 
        # Compression FTW.
225
 
        for revision, parents in sorted(result.items()):
226
 
            lines.append(' '.join((revision, ) + tuple(parents)))
227
 
 
228
 
        return SuccessfulSmartServerResponse(
229
 
            ('ok', ), bz2.compress('\n'.join(lines)))
230
 
 
231
 
 
232
 
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
233
 
 
234
 
    def do_readlocked_repository_request(self, repository, revision_id):
235
 
        """Return the result of repository.get_revision_graph(revision_id).
236
 
 
237
 
        Deprecated as of bzr 1.4, but supported for older clients.
238
 
 
239
 
        :param repository: The repository to query in.
240
 
        :param revision_id: The utf8 encoded revision_id to get a graph from.
241
 
        :return: A smart server response where the body contains an utf8
242
 
            encoded flattened list of the revision graph.
243
 
        """
244
 
        if not revision_id:
245
 
            revision_id = None
246
 
 
247
 
        lines = []
248
 
        graph = repository.get_graph()
249
 
        if revision_id:
250
 
            search_ids = [revision_id]
251
 
        else:
252
 
            search_ids = repository.all_revision_ids()
253
 
        search = graph._make_breadth_first_searcher(search_ids)
254
 
        transitive_ids = set()
255
 
        map(transitive_ids.update, list(search))
256
 
        parent_map = graph.get_parent_map(transitive_ids)
257
 
        revision_graph = _strip_NULL_ghosts(parent_map)
258
 
        if revision_id and revision_id not in revision_graph:
259
 
            # Note that we return an empty body, rather than omitting the body.
260
 
            # This way the client knows that it can always expect to find a body
261
 
            # in the response for this method, even in the error case.
262
 
            return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
263
 
 
264
 
        for revision, parents in revision_graph.items():
265
 
            lines.append(' '.join((revision, ) + tuple(parents)))
266
 
 
267
 
        return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
268
 
 
269
 
 
270
 
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
271
 
 
272
 
    def do_repository_request(self, repository, revision_id):
273
 
        """Return ok if a specific revision is in the repository at path.
274
 
 
275
 
        :param repository: The repository to query in.
276
 
        :param revision_id: The utf8 encoded revision_id to lookup.
277
 
        :return: A smart server response of ('ok', ) if the revision is
278
 
            present.
279
 
        """
280
 
        if repository.has_revision(revision_id):
281
 
            return SuccessfulSmartServerResponse(('yes', ))
282
 
        else:
283
 
            return SuccessfulSmartServerResponse(('no', ))
284
 
 
285
 
 
286
 
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
287
 
 
288
 
    def do_repository_request(self, repository, revid, committers):
289
 
        """Return the result of repository.gather_stats().
290
 
 
291
 
        :param repository: The repository to query in.
292
 
        :param revid: utf8 encoded rev id or an empty string to indicate None
293
 
        :param committers: 'yes' or 'no'.
294
 
 
295
 
        :return: A SmartServerResponse ('ok',), a encoded body looking like
296
 
              committers: 1
297
 
              firstrev: 1234.230 0
298
 
              latestrev: 345.700 3600
299
 
              revisions: 2
300
 
 
301
 
              But containing only fields returned by the gather_stats() call
302
 
        """
303
 
        if revid == '':
304
 
            decoded_revision_id = None
305
 
        else:
306
 
            decoded_revision_id = revid
307
 
        if committers == 'yes':
308
 
            decoded_committers = True
309
 
        else:
310
 
            decoded_committers = None
311
 
        stats = repository.gather_stats(decoded_revision_id, decoded_committers)
312
 
 
313
 
        body = ''
314
 
        if stats.has_key('committers'):
315
 
            body += 'committers: %d\n' % stats['committers']
316
 
        if stats.has_key('firstrev'):
317
 
            body += 'firstrev: %.3f %d\n' % stats['firstrev']
318
 
        if stats.has_key('latestrev'):
319
 
             body += 'latestrev: %.3f %d\n' % stats['latestrev']
320
 
        if stats.has_key('revisions'):
321
 
            body += 'revisions: %d\n' % stats['revisions']
322
 
        if stats.has_key('size'):
323
 
            body += 'size: %d\n' % stats['size']
324
 
 
325
 
        return SuccessfulSmartServerResponse(('ok', ), body)
326
 
 
327
 
 
328
 
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
329
 
 
330
 
    def do_repository_request(self, repository):
331
 
        """Return the result of repository.is_shared().
332
 
 
333
 
        :param repository: The repository to query in.
334
 
        :return: A smart server response of ('yes', ) if the repository is
335
 
            shared, and ('no', ) if it is not.
336
 
        """
337
 
        if repository.is_shared():
338
 
            return SuccessfulSmartServerResponse(('yes', ))
339
 
        else:
340
 
            return SuccessfulSmartServerResponse(('no', ))
341
 
 
342
 
 
343
 
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
344
 
 
345
 
    def do_repository_request(self, repository, token=''):
346
 
        # XXX: this probably should not have a token.
347
 
        if token == '':
348
 
            token = None
349
 
        try:
350
 
            token = repository.lock_write(token=token)
351
 
        except errors.LockContention, e:
352
 
            return FailedSmartServerResponse(('LockContention',))
353
 
        except errors.UnlockableTransport:
354
 
            return FailedSmartServerResponse(('UnlockableTransport',))
355
 
        except errors.LockFailed, e:
356
 
            return FailedSmartServerResponse(('LockFailed',
357
 
                str(e.lock), str(e.why)))
358
 
        if token is not None:
359
 
            repository.leave_lock_in_place()
360
 
        repository.unlock()
361
 
        if token is None:
362
 
            token = ''
363
 
        return SuccessfulSmartServerResponse(('ok', token))
364
 
 
365
 
 
366
 
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
367
 
 
368
 
    def do_repository_request(self, repository, to_network_name):
369
 
        """Get a stream for inserting into a to_format repository.
370
 
 
371
 
        :param repository: The repository to stream from.
372
 
        :param to_network_name: The network name of the format of the target
373
 
            repository.
374
 
        """
375
 
        self._to_format = network_format_registry.get(to_network_name)
376
 
        return None # Signal that we want a body.
377
 
 
378
 
    def do_body(self, body_bytes):
379
 
        repository = self._repository
380
 
        repository.lock_read()
381
 
        try:
382
 
            search_result, error = self.recreate_search(repository, body_bytes)
383
 
            if error is not None:
384
 
                repository.unlock()
385
 
                return error
386
 
            source = repository._get_source(self._to_format)
387
 
            stream = source.get_stream(search_result)
388
 
        except Exception:
389
 
            exc_info = sys.exc_info()
390
 
            try:
391
 
                # On non-error, unlocking is done by the body stream handler.
392
 
                repository.unlock()
393
 
            finally:
394
 
                raise exc_info[0], exc_info[1], exc_info[2]
395
 
        return SuccessfulSmartServerResponse(('ok',),
396
 
            body_stream=self.body_stream(stream, repository))
397
 
 
398
 
    def body_stream(self, stream, repository):
399
 
        byte_stream = _stream_to_byte_stream(stream, repository._format)
400
 
        try:
401
 
            for bytes in byte_stream:
402
 
                yield bytes
403
 
        except errors.RevisionNotPresent, e:
404
 
            # This shouldn't be able to happen, but as we don't buffer
405
 
            # everything it can in theory happen.
406
 
            repository.unlock()
407
 
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
408
 
        else:
409
 
            repository.unlock()
410
 
 
411
 
 
412
 
def _stream_to_byte_stream(stream, src_format):
413
 
    """Convert a record stream to a self delimited byte stream."""
414
 
    pack_writer = pack.ContainerSerialiser()
415
 
    yield pack_writer.begin()
416
 
    yield pack_writer.bytes_record(src_format.network_name(), '')
417
 
    for substream_type, substream in stream:
418
 
        for record in substream:
419
 
            if record.storage_kind in ('chunked', 'fulltext'):
420
 
                serialised = record_to_fulltext_bytes(record)
421
 
            else:
422
 
                serialised = record.get_bytes_as(record.storage_kind)
423
 
            if serialised:
424
 
                # Some streams embed the whole stream into the wire
425
 
                # representation of the first record, which means that
426
 
                # later records have no wire representation: we skip them.
427
 
                yield pack_writer.bytes_record(serialised, [(substream_type,)])
428
 
    yield pack_writer.end()
429
 
 
430
 
 
431
 
def _byte_stream_to_stream(byte_stream):
432
 
    """Convert a byte stream into a format and a stream.
433
 
 
434
 
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
435
 
    :return: (RepositoryFormat, stream_generator)
436
 
    """
437
 
    stream_decoder = pack.ContainerPushParser()
438
 
    def record_stream():
439
 
        """Closure to return the substreams."""
440
 
        # May have fully parsed records already.
441
 
        for record in stream_decoder.read_pending_records():
442
 
            record_names, record_bytes = record
443
 
            record_name, = record_names
444
 
            substream_type = record_name[0]
445
 
            substream = NetworkRecordStream([record_bytes])
446
 
            yield substream_type, substream.read()
447
 
        for bytes in byte_stream:
448
 
            stream_decoder.accept_bytes(bytes)
449
 
            for record in stream_decoder.read_pending_records():
450
 
                record_names, record_bytes = record
451
 
                record_name, = record_names
452
 
                substream_type = record_name[0]
453
 
                substream = NetworkRecordStream([record_bytes])
454
 
                yield substream_type, substream.read()
455
 
    for bytes in byte_stream:
456
 
        stream_decoder.accept_bytes(bytes)
457
 
        for record in stream_decoder.read_pending_records(max=1):
458
 
            record_names, src_format_name = record
459
 
            src_format = network_format_registry.get(src_format_name)
460
 
            return src_format, record_stream()
461
 
 
462
 
 
463
 
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
464
 
 
465
 
    def do_repository_request(self, repository, token):
466
 
        try:
467
 
            repository.lock_write(token=token)
468
 
        except errors.TokenMismatch, e:
469
 
            return FailedSmartServerResponse(('TokenMismatch',))
470
 
        repository.dont_leave_lock_in_place()
471
 
        repository.unlock()
472
 
        return SuccessfulSmartServerResponse(('ok',))
473
 
 
474
 
 
475
 
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
476
 
 
477
 
    def do_repository_request(self, repository, str_bool_new_value):
478
 
        if str_bool_new_value == 'True':
479
 
            new_value = True
480
 
        else:
481
 
            new_value = False
482
 
        repository.set_make_working_trees(new_value)
483
 
        return SuccessfulSmartServerResponse(('ok',))
484
 
 
485
 
 
486
 
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
487
 
    """Get the raw repository files as a tarball.
488
 
 
489
 
    The returned tarball contains a .bzr control directory which in turn
490
 
    contains a repository.
491
 
 
492
 
    This takes one parameter, compression, which currently must be
493
 
    "", "gz", or "bz2".
494
 
 
495
 
    This is used to implement the Repository.copy_content_into operation.
496
 
    """
497
 
 
498
 
    def do_repository_request(self, repository, compression):
499
 
        tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
500
 
        try:
501
 
            controldir_name = tmp_dirname + '/.bzr'
502
 
            return self._tarfile_response(controldir_name, compression)
503
 
        finally:
504
 
            osutils.rmtree(tmp_dirname)
505
 
 
506
 
    def _copy_to_tempdir(self, from_repo):
507
 
        tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
508
 
        tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
509
 
        tmp_repo = from_repo._format.initialize(tmp_bzrdir)
510
 
        from_repo.copy_content_into(tmp_repo)
511
 
        return tmp_dirname, tmp_repo
512
 
 
513
 
    def _tarfile_response(self, tmp_dirname, compression):
514
 
        temp = tempfile.NamedTemporaryFile()
515
 
        try:
516
 
            self._tarball_of_dir(tmp_dirname, compression, temp.file)
517
 
            # all finished; write the tempfile out to the network
518
 
            temp.seek(0)
519
 
            return SuccessfulSmartServerResponse(('ok',), temp.read())
520
 
            # FIXME: Don't read the whole thing into memory here; rather stream
521
 
            # it out from the file onto the network. mbp 20070411
522
 
        finally:
523
 
            temp.close()
524
 
 
525
 
    def _tarball_of_dir(self, dirname, compression, ofile):
526
 
        filename = os.path.basename(ofile.name)
527
 
        tarball = tarfile.open(fileobj=ofile, name=filename,
528
 
            mode='w|' + compression)
529
 
        try:
530
 
            # The tarball module only accepts ascii names, and (i guess)
531
 
            # packs them with their 8bit names.  We know all the files
532
 
            # within the repository have ASCII names so the should be safe
533
 
            # to pack in.
534
 
            dirname = dirname.encode(sys.getfilesystemencoding())
535
 
            # python's tarball module includes the whole path by default so
536
 
            # override it
537
 
            if not dirname.endswith('.bzr'):
538
 
                raise ValueError(dirname)
539
 
            tarball.add(dirname, '.bzr') # recursive by default
540
 
        finally:
541
 
            tarball.close()
542
 
 
543
 
 
544
 
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
545
 
    """Insert a record stream from a RemoteSink into a repository.
546
 
 
547
 
    This gets bytes pushed to it by the network infrastructure and turns that
548
 
    into a bytes iterator using a thread. That is then processed by
549
 
    _byte_stream_to_stream.
550
 
 
551
 
    New in 1.14.
552
 
    """
553
 
 
554
 
    def do_repository_request(self, repository, resume_tokens, lock_token):
555
 
        """StreamSink.insert_stream for a remote repository."""
556
 
        repository.lock_write(token=lock_token)
557
 
        self.do_insert_stream_request(repository, resume_tokens)
558
 
 
559
 
    def do_insert_stream_request(self, repository, resume_tokens):
560
 
        tokens = [token for token in resume_tokens.split(' ') if token]
561
 
        self.tokens = tokens
562
 
        self.repository = repository
563
 
        self.queue = Queue.Queue()
564
 
        self.insert_thread = threading.Thread(target=self._inserter_thread)
565
 
        self.insert_thread.start()
566
 
 
567
 
    def do_chunk(self, body_stream_chunk):
568
 
        self.queue.put(body_stream_chunk)
569
 
 
570
 
    def _inserter_thread(self):
571
 
        try:
572
 
            src_format, stream = _byte_stream_to_stream(
573
 
                self.blocking_byte_stream())
574
 
            self.insert_result = self.repository._get_sink().insert_stream(
575
 
                stream, src_format, self.tokens)
576
 
            self.insert_ok = True
577
 
        except:
578
 
            self.insert_exception = sys.exc_info()
579
 
            self.insert_ok = False
580
 
 
581
 
    def blocking_byte_stream(self):
582
 
        while True:
583
 
            bytes = self.queue.get()
584
 
            if bytes is StopIteration:
585
 
                return
586
 
            else:
587
 
                yield bytes
588
 
 
589
 
    def do_end(self):
590
 
        self.queue.put(StopIteration)
591
 
        if self.insert_thread is not None:
592
 
            self.insert_thread.join()
593
 
        if not self.insert_ok:
594
 
            exc_info = self.insert_exception
595
 
            raise exc_info[0], exc_info[1], exc_info[2]
596
 
        write_group_tokens, missing_keys = self.insert_result
597
 
        if write_group_tokens or missing_keys:
598
 
            # bzip needed? missing keys should typically be a small set.
599
 
            # Should this be a streaming body response ?
600
 
            missing_keys = sorted(missing_keys)
601
 
            bytes = bencode.bencode((write_group_tokens, missing_keys))
602
 
            self.repository.unlock()
603
 
            return SuccessfulSmartServerResponse(('missing-basis', bytes))
604
 
        else:
605
 
            self.repository.unlock()
606
 
            return SuccessfulSmartServerResponse(('ok', ))
607
 
 
608
 
 
609
 
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
610
 
    """Insert a record stream from a RemoteSink into an unlocked repository.
611
 
 
612
 
    This is the same as SmartServerRepositoryInsertStreamLocked, except it
613
 
    takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
614
 
    like pack format) repository.
615
 
 
616
 
    New in 1.13.
617
 
    """
618
 
 
619
 
    def do_repository_request(self, repository, resume_tokens):
620
 
        """StreamSink.insert_stream for a remote repository."""
621
 
        repository.lock_write()
622
 
        self.do_insert_stream_request(repository, resume_tokens)
623
 
 
624