~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

Fix lock test failures by taking lock breaking into account.

* tests/test_lockdir.py:
(TestLockDir.test_43_break): Release the lock after breaking and
acquiring it.

* tests/__init__.py:
(TestCase._check_locks): Consider lock breaks as releases.
(TestCase._track_locks, TestCase._lock_broken): Also track broken locks.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Server-side repository related request implmentations."""
 
18
 
 
19
import bz2
 
20
import os
 
21
import Queue
 
22
import struct
 
23
import sys
 
24
import tarfile
 
25
import tempfile
 
26
import threading
 
27
 
 
28
from bzrlib import (
 
29
    errors,
 
30
    graph,
 
31
    osutils,
 
32
    pack,
 
33
    )
 
34
from bzrlib.bzrdir import BzrDir
 
35
from bzrlib.smart.request import (
 
36
    FailedSmartServerResponse,
 
37
    SmartServerRequest,
 
38
    SuccessfulSmartServerResponse,
 
39
    )
 
40
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
 
41
from bzrlib import revision as _mod_revision
 
42
from bzrlib.util import bencode
 
43
from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
 
44
 
 
45
 
 
46
class SmartServerRepositoryRequest(SmartServerRequest):
 
47
    """Common base class for Repository requests."""
 
48
 
 
49
    def do(self, path, *args):
 
50
        """Execute a repository request.
 
51
 
 
52
        All Repository requests take a path to the repository as their first
 
53
        argument.  The repository must be at the exact path given by the
 
54
        client - no searching is done.
 
55
 
 
56
        The actual logic is delegated to self.do_repository_request.
 
57
 
 
58
        :param client_path: The path for the repository as received from the
 
59
            client.
 
60
        :return: A SmartServerResponse from self.do_repository_request().
 
61
        """
 
62
        transport = self.transport_from_client_path(path)
 
63
        bzrdir = BzrDir.open_from_transport(transport)
 
64
        # Save the repository for use with do_body.
 
65
        self._repository = bzrdir.open_repository()
 
66
        return self.do_repository_request(self._repository, *args)
 
67
 
 
68
    def do_repository_request(self, repository, *args):
 
69
        """Override to provide an implementation for a verb."""
 
70
        # No-op for verbs that take bodies (None as a result indicates a body
 
71
        # is expected)
 
72
        return None
 
73
 
 
74
    def recreate_search(self, repository, search_bytes):
 
75
        lines = search_bytes.split('\n')
 
76
        if lines[0] == 'ancestry-of':
 
77
            heads = lines[1:]
 
78
            search_result = graph.PendingAncestryResult(heads, repository)
 
79
            return search_result, None
 
80
        elif lines[0] == 'search':
 
81
            return self.recreate_search_from_recipe(repository, lines[1:])
 
82
        else:
 
83
            return (None, FailedSmartServerResponse(('BadSearch',)))
 
84
 
 
85
    def recreate_search_from_recipe(self, repository, lines):
 
86
        start_keys = set(lines[0].split(' '))
 
87
        exclude_keys = set(lines[1].split(' '))
 
88
        revision_count = int(lines[2])
 
89
        repository.lock_read()
 
90
        try:
 
91
            search = repository.get_graph()._make_breadth_first_searcher(
 
92
                start_keys)
 
93
            while True:
 
94
                try:
 
95
                    next_revs = search.next()
 
96
                except StopIteration:
 
97
                    break
 
98
                search.stop_searching_any(exclude_keys.intersection(next_revs))
 
99
            search_result = search.get_result()
 
100
            if search_result.get_recipe()[3] != revision_count:
 
101
                # we got back a different amount of data than expected, this
 
102
                # gets reported as NoSuchRevision, because less revisions
 
103
                # indicates missing revisions, and more should never happen as
 
104
                # the excludes list considers ghosts and ensures that ghost
 
105
                # filling races are not a problem.
 
106
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
 
107
            return (search_result, None)
 
108
        finally:
 
109
            repository.unlock()
 
110
 
 
111
 
 
112
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
 
113
    """Calls self.do_readlocked_repository_request."""
 
114
 
 
115
    def do_repository_request(self, repository, *args):
 
116
        """Read lock a repository for do_readlocked_repository_request."""
 
117
        repository.lock_read()
 
118
        try:
 
119
            return self.do_readlocked_repository_request(repository, *args)
 
120
        finally:
 
121
            repository.unlock()
 
122
 
 
123
 
 
124
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
 
125
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
 
126
 
 
127
    no_extra_results = False
 
128
 
 
129
    def do_repository_request(self, repository, *revision_ids):
 
130
        """Get parent details for some revisions.
 
131
 
 
132
        All the parents for revision_ids are returned. Additionally up to 64KB
 
133
        of additional parent data found by performing a breadth first search
 
134
        from revision_ids is returned. The verb takes a body containing the
 
135
        current search state, see do_body for details.
 
136
 
 
137
        If 'include-missing:' is in revision_ids, ghosts encountered in the
 
138
        graph traversal for getting parent data are included in the result with
 
139
        a prefix of 'missing:'.
 
140
 
 
141
        :param repository: The repository to query in.
 
142
        :param revision_ids: The utf8 encoded revision_id to answer for.
 
143
        """
 
144
        self._revision_ids = revision_ids
 
145
        return None # Signal that we want a body.
 
146
 
 
147
    def do_body(self, body_bytes):
 
148
        """Process the current search state and perform the parent lookup.
 
149
 
 
150
        :return: A smart server response where the body contains an utf8
 
151
            encoded flattened list of the parents of the revisions (the same
 
152
            format as Repository.get_revision_graph) which has been bz2
 
153
            compressed.
 
154
        """
 
155
        repository = self._repository
 
156
        repository.lock_read()
 
157
        try:
 
158
            return self._do_repository_request(body_bytes)
 
159
        finally:
 
160
            repository.unlock()
 
161
 
 
162
    def _do_repository_request(self, body_bytes):
 
163
        repository = self._repository
 
164
        revision_ids = set(self._revision_ids)
 
165
        include_missing = 'include-missing:' in revision_ids
 
166
        if include_missing:
 
167
            revision_ids.remove('include-missing:')
 
168
        body_lines = body_bytes.split('\n')
 
169
        search_result, error = self.recreate_search_from_recipe(
 
170
            repository, body_lines)
 
171
        if error is not None:
 
172
            return error
 
173
        # TODO might be nice to start up the search again; but thats not
 
174
        # written or tested yet.
 
175
        client_seen_revs = set(search_result.get_keys())
 
176
        # Always include the requested ids.
 
177
        client_seen_revs.difference_update(revision_ids)
 
178
        lines = []
 
179
        repo_graph = repository.get_graph()
 
180
        result = {}
 
181
        queried_revs = set()
 
182
        size_so_far = 0
 
183
        next_revs = revision_ids
 
184
        first_loop_done = False
 
185
        while next_revs:
 
186
            queried_revs.update(next_revs)
 
187
            parent_map = repo_graph.get_parent_map(next_revs)
 
188
            current_revs = next_revs
 
189
            next_revs = set()
 
190
            for revision_id in current_revs:
 
191
                missing_rev = False
 
192
                parents = parent_map.get(revision_id)
 
193
                if parents is not None:
 
194
                    # adjust for the wire
 
195
                    if parents == (_mod_revision.NULL_REVISION,):
 
196
                        parents = ()
 
197
                    # prepare the next query
 
198
                    next_revs.update(parents)
 
199
                    encoded_id = revision_id
 
200
                else:
 
201
                    missing_rev = True
 
202
                    encoded_id = "missing:" + revision_id
 
203
                    parents = []
 
204
                if (revision_id not in client_seen_revs and
 
205
                    (not missing_rev or include_missing)):
 
206
                    # Client does not have this revision, give it to it.
 
207
                    # add parents to the result
 
208
                    result[encoded_id] = parents
 
209
                    # Approximate the serialized cost of this revision_id.
 
210
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
 
211
            # get all the directly asked for parents, and then flesh out to
 
212
            # 64K (compressed) or so. We do one level of depth at a time to
 
213
            # stay in sync with the client. The 250000 magic number is
 
214
            # estimated compression ratio taken from bzr.dev itself.
 
215
            if self.no_extra_results or (
 
216
                first_loop_done and size_so_far > 250000):
 
217
                next_revs = set()
 
218
                break
 
219
            # don't query things we've already queried
 
220
            next_revs.difference_update(queried_revs)
 
221
            first_loop_done = True
 
222
 
 
223
        # sorting trivially puts lexographically similar revision ids together.
 
224
        # Compression FTW.
 
225
        for revision, parents in sorted(result.items()):
 
226
            lines.append(' '.join((revision, ) + tuple(parents)))
 
227
 
 
228
        return SuccessfulSmartServerResponse(
 
229
            ('ok', ), bz2.compress('\n'.join(lines)))
 
230
 
 
231
 
 
232
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
 
233
 
 
234
    def do_readlocked_repository_request(self, repository, revision_id):
 
235
        """Return the result of repository.get_revision_graph(revision_id).
 
236
 
 
237
        Deprecated as of bzr 1.4, but supported for older clients.
 
238
 
 
239
        :param repository: The repository to query in.
 
240
        :param revision_id: The utf8 encoded revision_id to get a graph from.
 
241
        :return: A smart server response where the body contains an utf8
 
242
            encoded flattened list of the revision graph.
 
243
        """
 
244
        if not revision_id:
 
245
            revision_id = None
 
246
 
 
247
        lines = []
 
248
        graph = repository.get_graph()
 
249
        if revision_id:
 
250
            search_ids = [revision_id]
 
251
        else:
 
252
            search_ids = repository.all_revision_ids()
 
253
        search = graph._make_breadth_first_searcher(search_ids)
 
254
        transitive_ids = set()
 
255
        map(transitive_ids.update, list(search))
 
256
        parent_map = graph.get_parent_map(transitive_ids)
 
257
        revision_graph = _strip_NULL_ghosts(parent_map)
 
258
        if revision_id and revision_id not in revision_graph:
 
259
            # Note that we return an empty body, rather than omitting the body.
 
260
            # This way the client knows that it can always expect to find a body
 
261
            # in the response for this method, even in the error case.
 
262
            return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
 
263
 
 
264
        for revision, parents in revision_graph.items():
 
265
            lines.append(' '.join((revision, ) + tuple(parents)))
 
266
 
 
267
        return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
 
268
 
 
269
 
 
270
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
 
271
 
 
272
    def do_repository_request(self, repository, revision_id):
 
273
        """Return ok if a specific revision is in the repository at path.
 
274
 
 
275
        :param repository: The repository to query in.
 
276
        :param revision_id: The utf8 encoded revision_id to lookup.
 
277
        :return: A smart server response of ('ok', ) if the revision is
 
278
            present.
 
279
        """
 
280
        if repository.has_revision(revision_id):
 
281
            return SuccessfulSmartServerResponse(('yes', ))
 
282
        else:
 
283
            return SuccessfulSmartServerResponse(('no', ))
 
284
 
 
285
 
 
286
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
 
287
 
 
288
    def do_repository_request(self, repository, revid, committers):
 
289
        """Return the result of repository.gather_stats().
 
290
 
 
291
        :param repository: The repository to query in.
 
292
        :param revid: utf8 encoded rev id or an empty string to indicate None
 
293
        :param committers: 'yes' or 'no'.
 
294
 
 
295
        :return: A SmartServerResponse ('ok',), a encoded body looking like
 
296
              committers: 1
 
297
              firstrev: 1234.230 0
 
298
              latestrev: 345.700 3600
 
299
              revisions: 2
 
300
 
 
301
              But containing only fields returned by the gather_stats() call
 
302
        """
 
303
        if revid == '':
 
304
            decoded_revision_id = None
 
305
        else:
 
306
            decoded_revision_id = revid
 
307
        if committers == 'yes':
 
308
            decoded_committers = True
 
309
        else:
 
310
            decoded_committers = None
 
311
        stats = repository.gather_stats(decoded_revision_id, decoded_committers)
 
312
 
 
313
        body = ''
 
314
        if stats.has_key('committers'):
 
315
            body += 'committers: %d\n' % stats['committers']
 
316
        if stats.has_key('firstrev'):
 
317
            body += 'firstrev: %.3f %d\n' % stats['firstrev']
 
318
        if stats.has_key('latestrev'):
 
319
             body += 'latestrev: %.3f %d\n' % stats['latestrev']
 
320
        if stats.has_key('revisions'):
 
321
            body += 'revisions: %d\n' % stats['revisions']
 
322
        if stats.has_key('size'):
 
323
            body += 'size: %d\n' % stats['size']
 
324
 
 
325
        return SuccessfulSmartServerResponse(('ok', ), body)
 
326
 
 
327
 
 
328
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
 
329
 
 
330
    def do_repository_request(self, repository):
 
331
        """Return the result of repository.is_shared().
 
332
 
 
333
        :param repository: The repository to query in.
 
334
        :return: A smart server response of ('yes', ) if the repository is
 
335
            shared, and ('no', ) if it is not.
 
336
        """
 
337
        if repository.is_shared():
 
338
            return SuccessfulSmartServerResponse(('yes', ))
 
339
        else:
 
340
            return SuccessfulSmartServerResponse(('no', ))
 
341
 
 
342
 
 
343
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
 
344
 
 
345
    def do_repository_request(self, repository, token=''):
 
346
        # XXX: this probably should not have a token.
 
347
        if token == '':
 
348
            token = None
 
349
        try:
 
350
            token = repository.lock_write(token=token)
 
351
        except errors.LockContention, e:
 
352
            return FailedSmartServerResponse(('LockContention',))
 
353
        except errors.UnlockableTransport:
 
354
            return FailedSmartServerResponse(('UnlockableTransport',))
 
355
        except errors.LockFailed, e:
 
356
            return FailedSmartServerResponse(('LockFailed',
 
357
                str(e.lock), str(e.why)))
 
358
        if token is not None:
 
359
            repository.leave_lock_in_place()
 
360
        repository.unlock()
 
361
        if token is None:
 
362
            token = ''
 
363
        return SuccessfulSmartServerResponse(('ok', token))
 
364
 
 
365
 
 
366
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
 
367
 
 
368
    def do_repository_request(self, repository, to_network_name):
 
369
        """Get a stream for inserting into a to_format repository.
 
370
 
 
371
        :param repository: The repository to stream from.
 
372
        :param to_network_name: The network name of the format of the target
 
373
            repository.
 
374
        """
 
375
        self._to_format = network_format_registry.get(to_network_name)
 
376
        return None # Signal that we want a body.
 
377
 
 
378
    def do_body(self, body_bytes):
 
379
        repository = self._repository
 
380
        repository.lock_read()
 
381
        try:
 
382
            search_result, error = self.recreate_search(repository, body_bytes)
 
383
            if error is not None:
 
384
                repository.unlock()
 
385
                return error
 
386
            source = repository._get_source(self._to_format)
 
387
            stream = source.get_stream(search_result)
 
388
        except Exception:
 
389
            exc_info = sys.exc_info()
 
390
            try:
 
391
                # On non-error, unlocking is done by the body stream handler.
 
392
                repository.unlock()
 
393
            finally:
 
394
                raise exc_info[0], exc_info[1], exc_info[2]
 
395
        return SuccessfulSmartServerResponse(('ok',),
 
396
            body_stream=self.body_stream(stream, repository))
 
397
 
 
398
    def body_stream(self, stream, repository):
 
399
        byte_stream = _stream_to_byte_stream(stream, repository._format)
 
400
        try:
 
401
            for bytes in byte_stream:
 
402
                yield bytes
 
403
        except errors.RevisionNotPresent, e:
 
404
            # This shouldn't be able to happen, but as we don't buffer
 
405
            # everything it can in theory happen.
 
406
            repository.unlock()
 
407
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
 
408
        else:
 
409
            repository.unlock()
 
410
 
 
411
 
 
412
def _stream_to_byte_stream(stream, src_format):
 
413
    """Convert a record stream to a self delimited byte stream."""
 
414
    pack_writer = pack.ContainerSerialiser()
 
415
    yield pack_writer.begin()
 
416
    yield pack_writer.bytes_record(src_format.network_name(), '')
 
417
    for substream_type, substream in stream:
 
418
        for record in substream:
 
419
            if record.storage_kind in ('chunked', 'fulltext'):
 
420
                serialised = record_to_fulltext_bytes(record)
 
421
            else:
 
422
                serialised = record.get_bytes_as(record.storage_kind)
 
423
            if serialised:
 
424
                # Some streams embed the whole stream into the wire
 
425
                # representation of the first record, which means that
 
426
                # later records have no wire representation: we skip them.
 
427
                yield pack_writer.bytes_record(serialised, [(substream_type,)])
 
428
    yield pack_writer.end()
 
429
 
 
430
 
 
431
def _byte_stream_to_stream(byte_stream):
 
432
    """Convert a byte stream into a format and a stream.
 
433
 
 
434
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
 
435
    :return: (RepositoryFormat, stream_generator)
 
436
    """
 
437
    stream_decoder = pack.ContainerPushParser()
 
438
    def record_stream():
 
439
        """Closure to return the substreams."""
 
440
        # May have fully parsed records already.
 
441
        for record in stream_decoder.read_pending_records():
 
442
            record_names, record_bytes = record
 
443
            record_name, = record_names
 
444
            substream_type = record_name[0]
 
445
            substream = NetworkRecordStream([record_bytes])
 
446
            yield substream_type, substream.read()
 
447
        for bytes in byte_stream:
 
448
            stream_decoder.accept_bytes(bytes)
 
449
            for record in stream_decoder.read_pending_records():
 
450
                record_names, record_bytes = record
 
451
                record_name, = record_names
 
452
                substream_type = record_name[0]
 
453
                substream = NetworkRecordStream([record_bytes])
 
454
                yield substream_type, substream.read()
 
455
    for bytes in byte_stream:
 
456
        stream_decoder.accept_bytes(bytes)
 
457
        for record in stream_decoder.read_pending_records(max=1):
 
458
            record_names, src_format_name = record
 
459
            src_format = network_format_registry.get(src_format_name)
 
460
            return src_format, record_stream()
 
461
 
 
462
 
 
463
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
 
464
 
 
465
    def do_repository_request(self, repository, token):
 
466
        try:
 
467
            repository.lock_write(token=token)
 
468
        except errors.TokenMismatch, e:
 
469
            return FailedSmartServerResponse(('TokenMismatch',))
 
470
        repository.dont_leave_lock_in_place()
 
471
        repository.unlock()
 
472
        return SuccessfulSmartServerResponse(('ok',))
 
473
 
 
474
 
 
475
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
 
476
 
 
477
    def do_repository_request(self, repository, str_bool_new_value):
 
478
        if str_bool_new_value == 'True':
 
479
            new_value = True
 
480
        else:
 
481
            new_value = False
 
482
        repository.set_make_working_trees(new_value)
 
483
        return SuccessfulSmartServerResponse(('ok',))
 
484
 
 
485
 
 
486
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
 
487
    """Get the raw repository files as a tarball.
 
488
 
 
489
    The returned tarball contains a .bzr control directory which in turn
 
490
    contains a repository.
 
491
 
 
492
    This takes one parameter, compression, which currently must be
 
493
    "", "gz", or "bz2".
 
494
 
 
495
    This is used to implement the Repository.copy_content_into operation.
 
496
    """
 
497
 
 
498
    def do_repository_request(self, repository, compression):
 
499
        tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
 
500
        try:
 
501
            controldir_name = tmp_dirname + '/.bzr'
 
502
            return self._tarfile_response(controldir_name, compression)
 
503
        finally:
 
504
            osutils.rmtree(tmp_dirname)
 
505
 
 
506
    def _copy_to_tempdir(self, from_repo):
 
507
        tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
 
508
        tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
 
509
        tmp_repo = from_repo._format.initialize(tmp_bzrdir)
 
510
        from_repo.copy_content_into(tmp_repo)
 
511
        return tmp_dirname, tmp_repo
 
512
 
 
513
    def _tarfile_response(self, tmp_dirname, compression):
 
514
        temp = tempfile.NamedTemporaryFile()
 
515
        try:
 
516
            self._tarball_of_dir(tmp_dirname, compression, temp.file)
 
517
            # all finished; write the tempfile out to the network
 
518
            temp.seek(0)
 
519
            return SuccessfulSmartServerResponse(('ok',), temp.read())
 
520
            # FIXME: Don't read the whole thing into memory here; rather stream
 
521
            # it out from the file onto the network. mbp 20070411
 
522
        finally:
 
523
            temp.close()
 
524
 
 
525
    def _tarball_of_dir(self, dirname, compression, ofile):
 
526
        filename = os.path.basename(ofile.name)
 
527
        tarball = tarfile.open(fileobj=ofile, name=filename,
 
528
            mode='w|' + compression)
 
529
        try:
 
530
            # The tarball module only accepts ascii names, and (i guess)
 
531
            # packs them with their 8bit names.  We know all the files
 
532
            # within the repository have ASCII names so the should be safe
 
533
            # to pack in.
 
534
            dirname = dirname.encode(sys.getfilesystemencoding())
 
535
            # python's tarball module includes the whole path by default so
 
536
            # override it
 
537
            if not dirname.endswith('.bzr'):
 
538
                raise ValueError(dirname)
 
539
            tarball.add(dirname, '.bzr') # recursive by default
 
540
        finally:
 
541
            tarball.close()
 
542
 
 
543
 
 
544
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
 
545
    """Insert a record stream from a RemoteSink into a repository.
 
546
 
 
547
    This gets bytes pushed to it by the network infrastructure and turns that
 
548
    into a bytes iterator using a thread. That is then processed by
 
549
    _byte_stream_to_stream.
 
550
 
 
551
    New in 1.14.
 
552
    """
 
553
 
 
554
    def do_repository_request(self, repository, resume_tokens, lock_token):
 
555
        """StreamSink.insert_stream for a remote repository."""
 
556
        repository.lock_write(token=lock_token)
 
557
        self.do_insert_stream_request(repository, resume_tokens)
 
558
 
 
559
    def do_insert_stream_request(self, repository, resume_tokens):
 
560
        tokens = [token for token in resume_tokens.split(' ') if token]
 
561
        self.tokens = tokens
 
562
        self.repository = repository
 
563
        self.queue = Queue.Queue()
 
564
        self.insert_thread = threading.Thread(target=self._inserter_thread)
 
565
        self.insert_thread.start()
 
566
 
 
567
    def do_chunk(self, body_stream_chunk):
 
568
        self.queue.put(body_stream_chunk)
 
569
 
 
570
    def _inserter_thread(self):
 
571
        try:
 
572
            src_format, stream = _byte_stream_to_stream(
 
573
                self.blocking_byte_stream())
 
574
            self.insert_result = self.repository._get_sink().insert_stream(
 
575
                stream, src_format, self.tokens)
 
576
            self.insert_ok = True
 
577
        except:
 
578
            self.insert_exception = sys.exc_info()
 
579
            self.insert_ok = False
 
580
 
 
581
    def blocking_byte_stream(self):
 
582
        while True:
 
583
            bytes = self.queue.get()
 
584
            if bytes is StopIteration:
 
585
                return
 
586
            else:
 
587
                yield bytes
 
588
 
 
589
    def do_end(self):
 
590
        self.queue.put(StopIteration)
 
591
        if self.insert_thread is not None:
 
592
            self.insert_thread.join()
 
593
        if not self.insert_ok:
 
594
            exc_info = self.insert_exception
 
595
            raise exc_info[0], exc_info[1], exc_info[2]
 
596
        write_group_tokens, missing_keys = self.insert_result
 
597
        if write_group_tokens or missing_keys:
 
598
            # bzip needed? missing keys should typically be a small set.
 
599
            # Should this be a streaming body response ?
 
600
            missing_keys = sorted(missing_keys)
 
601
            bytes = bencode.bencode((write_group_tokens, missing_keys))
 
602
            self.repository.unlock()
 
603
            return SuccessfulSmartServerResponse(('missing-basis', bytes))
 
604
        else:
 
605
            self.repository.unlock()
 
606
            return SuccessfulSmartServerResponse(('ok', ))
 
607
 
 
608
 
 
609
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
 
610
    """Insert a record stream from a RemoteSink into an unlocked repository.
 
611
 
 
612
    This is the same as SmartServerRepositoryInsertStreamLocked, except it
 
613
    takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
 
614
    like pack format) repository.
 
615
 
 
616
    New in 1.13.
 
617
    """
 
618
 
 
619
    def do_repository_request(self, repository, resume_tokens):
 
620
        """StreamSink.insert_stream for a remote repository."""
 
621
        repository.lock_write()
 
622
        self.do_insert_stream_request(repository, resume_tokens)
 
623
 
 
624