~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Martin Pool
  • Date: 2006-03-15 19:46:05 UTC
  • mto: (1668.1.8 bzr-0.8.mbp)
  • mto: This revision was merged to the branch mainline in revision 1710.
  • Revision ID: mbp@sourcefrog.net-20060315194605-8b8ce129bff44a1d
Start forming xmlrpc requests

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Server-side repository related request implmentations."""
18
 
 
19
 
import bz2
20
 
import os
21
 
import Queue
22
 
import sys
23
 
import tempfile
24
 
import threading
25
 
 
26
 
from bzrlib import (
27
 
    bencode,
28
 
    errors,
29
 
    graph,
30
 
    osutils,
31
 
    pack,
32
 
    ui,
33
 
    versionedfile,
34
 
    )
35
 
from bzrlib.bzrdir import BzrDir
36
 
from bzrlib.smart.request import (
37
 
    FailedSmartServerResponse,
38
 
    SmartServerRequest,
39
 
    SuccessfulSmartServerResponse,
40
 
    )
41
 
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
42
 
from bzrlib import revision as _mod_revision
43
 
from bzrlib.versionedfile import (
44
 
    NetworkRecordStream,
45
 
    record_to_fulltext_bytes,
46
 
    )
47
 
 
48
 
 
49
 
class SmartServerRepositoryRequest(SmartServerRequest):
50
 
    """Common base class for Repository requests."""
51
 
 
52
 
    def do(self, path, *args):
53
 
        """Execute a repository request.
54
 
 
55
 
        All Repository requests take a path to the repository as their first
56
 
        argument.  The repository must be at the exact path given by the
57
 
        client - no searching is done.
58
 
 
59
 
        The actual logic is delegated to self.do_repository_request.
60
 
 
61
 
        :param client_path: The path for the repository as received from the
62
 
            client.
63
 
        :return: A SmartServerResponse from self.do_repository_request().
64
 
        """
65
 
        transport = self.transport_from_client_path(path)
66
 
        bzrdir = BzrDir.open_from_transport(transport)
67
 
        # Save the repository for use with do_body.
68
 
        self._repository = bzrdir.open_repository()
69
 
        return self.do_repository_request(self._repository, *args)
70
 
 
71
 
    def do_repository_request(self, repository, *args):
72
 
        """Override to provide an implementation for a verb."""
73
 
        # No-op for verbs that take bodies (None as a result indicates a body
74
 
        # is expected)
75
 
        return None
76
 
 
77
 
    def recreate_search(self, repository, search_bytes, discard_excess=False):
78
 
        """Recreate a search from its serialised form.
79
 
 
80
 
        :param discard_excess: If True, and the search refers to data we don't
81
 
            have, just silently accept that fact - the verb calling
82
 
            recreate_search trusts that clients will look for missing things
83
 
            they expected and get it from elsewhere.
84
 
        """
85
 
        lines = search_bytes.split('\n')
86
 
        if lines[0] == 'ancestry-of':
87
 
            heads = lines[1:]
88
 
            search_result = graph.PendingAncestryResult(heads, repository)
89
 
            return search_result, None
90
 
        elif lines[0] == 'search':
91
 
            return self.recreate_search_from_recipe(repository, lines[1:],
92
 
                discard_excess=discard_excess)
93
 
        else:
94
 
            return (None, FailedSmartServerResponse(('BadSearch',)))
95
 
 
96
 
    def recreate_search_from_recipe(self, repository, lines,
97
 
        discard_excess=False):
98
 
        """Recreate a specific revision search (vs a from-tip search).
99
 
 
100
 
        :param discard_excess: If True, and the search refers to data we don't
101
 
            have, just silently accept that fact - the verb calling
102
 
            recreate_search trusts that clients will look for missing things
103
 
            they expected and get it from elsewhere.
104
 
        """
105
 
        start_keys = set(lines[0].split(' '))
106
 
        exclude_keys = set(lines[1].split(' '))
107
 
        revision_count = int(lines[2])
108
 
        repository.lock_read()
109
 
        try:
110
 
            search = repository.get_graph()._make_breadth_first_searcher(
111
 
                start_keys)
112
 
            while True:
113
 
                try:
114
 
                    next_revs = search.next()
115
 
                except StopIteration:
116
 
                    break
117
 
                search.stop_searching_any(exclude_keys.intersection(next_revs))
118
 
            search_result = search.get_result()
119
 
            if (not discard_excess and
120
 
                search_result.get_recipe()[3] != revision_count):
121
 
                # we got back a different amount of data than expected, this
122
 
                # gets reported as NoSuchRevision, because less revisions
123
 
                # indicates missing revisions, and more should never happen as
124
 
                # the excludes list considers ghosts and ensures that ghost
125
 
                # filling races are not a problem.
126
 
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
127
 
            return (search_result, None)
128
 
        finally:
129
 
            repository.unlock()
130
 
 
131
 
 
132
 
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
133
 
    """Calls self.do_readlocked_repository_request."""
134
 
 
135
 
    def do_repository_request(self, repository, *args):
136
 
        """Read lock a repository for do_readlocked_repository_request."""
137
 
        repository.lock_read()
138
 
        try:
139
 
            return self.do_readlocked_repository_request(repository, *args)
140
 
        finally:
141
 
            repository.unlock()
142
 
 
143
 
 
144
 
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
145
 
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
146
 
 
147
 
    no_extra_results = False
148
 
 
149
 
    def do_repository_request(self, repository, *revision_ids):
150
 
        """Get parent details for some revisions.
151
 
 
152
 
        All the parents for revision_ids are returned. Additionally up to 64KB
153
 
        of additional parent data found by performing a breadth first search
154
 
        from revision_ids is returned. The verb takes a body containing the
155
 
        current search state, see do_body for details.
156
 
 
157
 
        If 'include-missing:' is in revision_ids, ghosts encountered in the
158
 
        graph traversal for getting parent data are included in the result with
159
 
        a prefix of 'missing:'.
160
 
 
161
 
        :param repository: The repository to query in.
162
 
        :param revision_ids: The utf8 encoded revision_id to answer for.
163
 
        """
164
 
        self._revision_ids = revision_ids
165
 
        return None # Signal that we want a body.
166
 
 
167
 
    def do_body(self, body_bytes):
168
 
        """Process the current search state and perform the parent lookup.
169
 
 
170
 
        :return: A smart server response where the body contains an utf8
171
 
            encoded flattened list of the parents of the revisions (the same
172
 
            format as Repository.get_revision_graph) which has been bz2
173
 
            compressed.
174
 
        """
175
 
        repository = self._repository
176
 
        repository.lock_read()
177
 
        try:
178
 
            return self._do_repository_request(body_bytes)
179
 
        finally:
180
 
            repository.unlock()
181
 
 
182
 
    def _do_repository_request(self, body_bytes):
183
 
        repository = self._repository
184
 
        revision_ids = set(self._revision_ids)
185
 
        include_missing = 'include-missing:' in revision_ids
186
 
        if include_missing:
187
 
            revision_ids.remove('include-missing:')
188
 
        body_lines = body_bytes.split('\n')
189
 
        search_result, error = self.recreate_search_from_recipe(
190
 
            repository, body_lines)
191
 
        if error is not None:
192
 
            return error
193
 
        # TODO might be nice to start up the search again; but thats not
194
 
        # written or tested yet.
195
 
        client_seen_revs = set(search_result.get_keys())
196
 
        # Always include the requested ids.
197
 
        client_seen_revs.difference_update(revision_ids)
198
 
        lines = []
199
 
        repo_graph = repository.get_graph()
200
 
        result = {}
201
 
        queried_revs = set()
202
 
        size_so_far = 0
203
 
        next_revs = revision_ids
204
 
        first_loop_done = False
205
 
        while next_revs:
206
 
            queried_revs.update(next_revs)
207
 
            parent_map = repo_graph.get_parent_map(next_revs)
208
 
            current_revs = next_revs
209
 
            next_revs = set()
210
 
            for revision_id in current_revs:
211
 
                missing_rev = False
212
 
                parents = parent_map.get(revision_id)
213
 
                if parents is not None:
214
 
                    # adjust for the wire
215
 
                    if parents == (_mod_revision.NULL_REVISION,):
216
 
                        parents = ()
217
 
                    # prepare the next query
218
 
                    next_revs.update(parents)
219
 
                    encoded_id = revision_id
220
 
                else:
221
 
                    missing_rev = True
222
 
                    encoded_id = "missing:" + revision_id
223
 
                    parents = []
224
 
                if (revision_id not in client_seen_revs and
225
 
                    (not missing_rev or include_missing)):
226
 
                    # Client does not have this revision, give it to it.
227
 
                    # add parents to the result
228
 
                    result[encoded_id] = parents
229
 
                    # Approximate the serialized cost of this revision_id.
230
 
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
231
 
            # get all the directly asked for parents, and then flesh out to
232
 
            # 64K (compressed) or so. We do one level of depth at a time to
233
 
            # stay in sync with the client. The 250000 magic number is
234
 
            # estimated compression ratio taken from bzr.dev itself.
235
 
            if self.no_extra_results or (
236
 
                first_loop_done and size_so_far > 250000):
237
 
                next_revs = set()
238
 
                break
239
 
            # don't query things we've already queried
240
 
            next_revs.difference_update(queried_revs)
241
 
            first_loop_done = True
242
 
 
243
 
        # sorting trivially puts lexographically similar revision ids together.
244
 
        # Compression FTW.
245
 
        for revision, parents in sorted(result.items()):
246
 
            lines.append(' '.join((revision, ) + tuple(parents)))
247
 
 
248
 
        return SuccessfulSmartServerResponse(
249
 
            ('ok', ), bz2.compress('\n'.join(lines)))
250
 
 
251
 
 
252
 
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
253
 
 
254
 
    def do_readlocked_repository_request(self, repository, revision_id):
255
 
        """Return the result of repository.get_revision_graph(revision_id).
256
 
 
257
 
        Deprecated as of bzr 1.4, but supported for older clients.
258
 
 
259
 
        :param repository: The repository to query in.
260
 
        :param revision_id: The utf8 encoded revision_id to get a graph from.
261
 
        :return: A smart server response where the body contains an utf8
262
 
            encoded flattened list of the revision graph.
263
 
        """
264
 
        if not revision_id:
265
 
            revision_id = None
266
 
 
267
 
        lines = []
268
 
        graph = repository.get_graph()
269
 
        if revision_id:
270
 
            search_ids = [revision_id]
271
 
        else:
272
 
            search_ids = repository.all_revision_ids()
273
 
        search = graph._make_breadth_first_searcher(search_ids)
274
 
        transitive_ids = set()
275
 
        map(transitive_ids.update, list(search))
276
 
        parent_map = graph.get_parent_map(transitive_ids)
277
 
        revision_graph = _strip_NULL_ghosts(parent_map)
278
 
        if revision_id and revision_id not in revision_graph:
279
 
            # Note that we return an empty body, rather than omitting the body.
280
 
            # This way the client knows that it can always expect to find a body
281
 
            # in the response for this method, even in the error case.
282
 
            return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
283
 
 
284
 
        for revision, parents in revision_graph.items():
285
 
            lines.append(' '.join((revision, ) + tuple(parents)))
286
 
 
287
 
        return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
288
 
 
289
 
 
290
 
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
291
 
 
292
 
    def do_readlocked_repository_request(self, repository, revno,
293
 
            known_pair):
294
 
        """Find the revid for a given revno, given a known revno/revid pair.
295
 
        
296
 
        New in 1.17.
297
 
        """
298
 
        try:
299
 
            found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
300
 
        except errors.RevisionNotPresent, err:
301
 
            if err.revision_id != known_pair[1]:
302
 
                raise AssertionError(
303
 
                    'get_rev_id_for_revno raised RevisionNotPresent for '
304
 
                    'non-initial revision: ' + err.revision_id)
305
 
            return FailedSmartServerResponse(
306
 
                ('nosuchrevision', err.revision_id))
307
 
        if found_flag:
308
 
            return SuccessfulSmartServerResponse(('ok', result))
309
 
        else:
310
 
            earliest_revno, earliest_revid = result
311
 
            return SuccessfulSmartServerResponse(
312
 
                ('history-incomplete', earliest_revno, earliest_revid))
313
 
 
314
 
 
315
 
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
316
 
 
317
 
    def do_repository_request(self, repository, revision_id):
318
 
        """Return ok if a specific revision is in the repository at path.
319
 
 
320
 
        :param repository: The repository to query in.
321
 
        :param revision_id: The utf8 encoded revision_id to lookup.
322
 
        :return: A smart server response of ('ok', ) if the revision is
323
 
            present.
324
 
        """
325
 
        if repository.has_revision(revision_id):
326
 
            return SuccessfulSmartServerResponse(('yes', ))
327
 
        else:
328
 
            return SuccessfulSmartServerResponse(('no', ))
329
 
 
330
 
 
331
 
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
332
 
 
333
 
    def do_repository_request(self, repository, revid, committers):
334
 
        """Return the result of repository.gather_stats().
335
 
 
336
 
        :param repository: The repository to query in.
337
 
        :param revid: utf8 encoded rev id or an empty string to indicate None
338
 
        :param committers: 'yes' or 'no'.
339
 
 
340
 
        :return: A SmartServerResponse ('ok',), a encoded body looking like
341
 
              committers: 1
342
 
              firstrev: 1234.230 0
343
 
              latestrev: 345.700 3600
344
 
              revisions: 2
345
 
 
346
 
              But containing only fields returned by the gather_stats() call
347
 
        """
348
 
        if revid == '':
349
 
            decoded_revision_id = None
350
 
        else:
351
 
            decoded_revision_id = revid
352
 
        if committers == 'yes':
353
 
            decoded_committers = True
354
 
        else:
355
 
            decoded_committers = None
356
 
        stats = repository.gather_stats(decoded_revision_id, decoded_committers)
357
 
 
358
 
        body = ''
359
 
        if stats.has_key('committers'):
360
 
            body += 'committers: %d\n' % stats['committers']
361
 
        if stats.has_key('firstrev'):
362
 
            body += 'firstrev: %.3f %d\n' % stats['firstrev']
363
 
        if stats.has_key('latestrev'):
364
 
             body += 'latestrev: %.3f %d\n' % stats['latestrev']
365
 
        if stats.has_key('revisions'):
366
 
            body += 'revisions: %d\n' % stats['revisions']
367
 
        if stats.has_key('size'):
368
 
            body += 'size: %d\n' % stats['size']
369
 
 
370
 
        return SuccessfulSmartServerResponse(('ok', ), body)
371
 
 
372
 
 
373
 
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
374
 
 
375
 
    def do_repository_request(self, repository):
376
 
        """Return the result of repository.is_shared().
377
 
 
378
 
        :param repository: The repository to query in.
379
 
        :return: A smart server response of ('yes', ) if the repository is
380
 
            shared, and ('no', ) if it is not.
381
 
        """
382
 
        if repository.is_shared():
383
 
            return SuccessfulSmartServerResponse(('yes', ))
384
 
        else:
385
 
            return SuccessfulSmartServerResponse(('no', ))
386
 
 
387
 
 
388
 
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
389
 
 
390
 
    def do_repository_request(self, repository, token=''):
391
 
        # XXX: this probably should not have a token.
392
 
        if token == '':
393
 
            token = None
394
 
        try:
395
 
            token = repository.lock_write(token=token)
396
 
        except errors.LockContention, e:
397
 
            return FailedSmartServerResponse(('LockContention',))
398
 
        except errors.UnlockableTransport:
399
 
            return FailedSmartServerResponse(('UnlockableTransport',))
400
 
        except errors.LockFailed, e:
401
 
            return FailedSmartServerResponse(('LockFailed',
402
 
                str(e.lock), str(e.why)))
403
 
        if token is not None:
404
 
            repository.leave_lock_in_place()
405
 
        repository.unlock()
406
 
        if token is None:
407
 
            token = ''
408
 
        return SuccessfulSmartServerResponse(('ok', token))
409
 
 
410
 
 
411
 
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
412
 
 
413
 
    def do_repository_request(self, repository, to_network_name):
414
 
        """Get a stream for inserting into a to_format repository.
415
 
 
416
 
        :param repository: The repository to stream from.
417
 
        :param to_network_name: The network name of the format of the target
418
 
            repository.
419
 
        """
420
 
        self._to_format = network_format_registry.get(to_network_name)
421
 
        if self._should_fake_unknown():
422
 
            return FailedSmartServerResponse(
423
 
                ('UnknownMethod', 'Repository.get_stream'))
424
 
        return None # Signal that we want a body.
425
 
 
426
 
    def _should_fake_unknown(self):
427
 
        """Return True if we should return UnknownMethod to the client.
428
 
        
429
 
        This is a workaround for bugs in pre-1.19 clients that claim to
430
 
        support receiving streams of CHK repositories.  The pre-1.19 client
431
 
        expects inventory records to be serialized in the format defined by
432
 
        to_network_name, but in pre-1.19 (at least) that format definition
433
 
        tries to use the xml5 serializer, which does not correctly handle
434
 
        rich-roots.  After 1.19 the client can also accept inventory-deltas
435
 
        (which avoids this issue), and those clients will use the
436
 
        Repository.get_stream_1.19 verb instead of this one.
437
 
        So: if this repository is CHK, and the to_format doesn't match,
438
 
        we should just fake an UnknownSmartMethod error so that the client
439
 
        will fallback to VFS, rather than sending it a stream we know it
440
 
        cannot handle.
441
 
        """
442
 
        from_format = self._repository._format
443
 
        to_format = self._to_format
444
 
        if not from_format.supports_chks:
445
 
            # Source not CHK: that's ok
446
 
            return False
447
 
        if (to_format.supports_chks and
448
 
            from_format.repository_class is to_format.repository_class and
449
 
            from_format._serializer == to_format._serializer):
450
 
            # Source is CHK, but target matches: that's ok
451
 
            # (e.g. 2a->2a, or CHK2->2a)
452
 
            return False
453
 
        # Source is CHK, and target is not CHK or incompatible CHK.  We can't
454
 
        # generate a compatible stream.
455
 
        return True
456
 
 
457
 
    def do_body(self, body_bytes):
458
 
        repository = self._repository
459
 
        repository.lock_read()
460
 
        try:
461
 
            search_result, error = self.recreate_search(repository, body_bytes,
462
 
                discard_excess=True)
463
 
            if error is not None:
464
 
                repository.unlock()
465
 
                return error
466
 
            source = repository._get_source(self._to_format)
467
 
            stream = source.get_stream(search_result)
468
 
        except Exception:
469
 
            exc_info = sys.exc_info()
470
 
            try:
471
 
                # On non-error, unlocking is done by the body stream handler.
472
 
                repository.unlock()
473
 
            finally:
474
 
                raise exc_info[0], exc_info[1], exc_info[2]
475
 
        return SuccessfulSmartServerResponse(('ok',),
476
 
            body_stream=self.body_stream(stream, repository))
477
 
 
478
 
    def body_stream(self, stream, repository):
479
 
        byte_stream = _stream_to_byte_stream(stream, repository._format)
480
 
        try:
481
 
            for bytes in byte_stream:
482
 
                yield bytes
483
 
        except errors.RevisionNotPresent, e:
484
 
            # This shouldn't be able to happen, but as we don't buffer
485
 
            # everything it can in theory happen.
486
 
            repository.unlock()
487
 
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
488
 
        else:
489
 
            repository.unlock()
490
 
 
491
 
 
492
 
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
493
 
 
494
 
    def _should_fake_unknown(self):
495
 
        """Returns False; we don't need to workaround bugs in 1.19+ clients."""
496
 
        return False
497
 
 
498
 
 
499
 
def _stream_to_byte_stream(stream, src_format):
500
 
    """Convert a record stream to a self delimited byte stream."""
501
 
    pack_writer = pack.ContainerSerialiser()
502
 
    yield pack_writer.begin()
503
 
    yield pack_writer.bytes_record(src_format.network_name(), '')
504
 
    for substream_type, substream in stream:
505
 
        if substream_type == 'inventory-deltas':
506
 
            # This doesn't feel like the ideal place to issue this warning;
507
 
            # however we don't want to do it in the Repository that's
508
 
            # generating the stream, because that might be on the server.
509
 
            # Instead we try to observe it as the stream goes by.
510
 
            ui.ui_factory.warn_cross_format_fetch(src_format,
511
 
                '(remote)')
512
 
        for record in substream:
513
 
            if record.storage_kind in ('chunked', 'fulltext'):
514
 
                serialised = record_to_fulltext_bytes(record)
515
 
            elif record.storage_kind == 'inventory-delta':
516
 
                serialised = record_to_inventory_delta_bytes(record)
517
 
            elif record.storage_kind == 'absent':
518
 
                raise ValueError("Absent factory for %s" % (record.key,))
519
 
            else:
520
 
                serialised = record.get_bytes_as(record.storage_kind)
521
 
            if serialised:
522
 
                # Some streams embed the whole stream into the wire
523
 
                # representation of the first record, which means that
524
 
                # later records have no wire representation: we skip them.
525
 
                yield pack_writer.bytes_record(serialised, [(substream_type,)])
526
 
    yield pack_writer.end()
527
 
 
528
 
 
529
 
class _ByteStreamDecoder(object):
530
 
    """Helper for _byte_stream_to_stream.
531
 
 
532
 
    The expected usage of this class is via the function _byte_stream_to_stream
533
 
    which creates a _ByteStreamDecoder, pops off the stream format and then
534
 
    yields the output of record_stream(), the main entry point to
535
 
    _ByteStreamDecoder.
536
 
 
537
 
    Broadly this class has to unwrap two layers of iterators:
538
 
    (type, substream)
539
 
    (substream details)
540
 
 
541
 
    This is complicated by wishing to return type, iterator_for_type, but
542
 
    getting the data for iterator_for_type when we find out type: we can't
543
 
    simply pass a generator down to the NetworkRecordStream parser, instead
544
 
    we have a little local state to seed each NetworkRecordStream instance,
545
 
    and gather the type that we'll be yielding.
546
 
 
547
 
    :ivar byte_stream: The byte stream being decoded.
548
 
    :ivar stream_decoder: A pack parser used to decode the bytestream
549
 
    :ivar current_type: The current type, used to join adjacent records of the
550
 
        same type into a single stream.
551
 
    :ivar first_bytes: The first bytes to give the next NetworkRecordStream.
552
 
    """
553
 
 
554
 
    def __init__(self, byte_stream):
555
 
        """Create a _ByteStreamDecoder."""
556
 
        self.stream_decoder = pack.ContainerPushParser()
557
 
        self.current_type = None
558
 
        self.first_bytes = None
559
 
        self.byte_stream = byte_stream
560
 
 
561
 
    def iter_stream_decoder(self):
562
 
        """Iterate the contents of the pack from stream_decoder."""
563
 
        # dequeue pending items
564
 
        for record in self.stream_decoder.read_pending_records():
565
 
            yield record
566
 
        # Pull bytes of the wire, decode them to records, yield those records.
567
 
        for bytes in self.byte_stream:
568
 
            self.stream_decoder.accept_bytes(bytes)
569
 
            for record in self.stream_decoder.read_pending_records():
570
 
                yield record
571
 
 
572
 
    def iter_substream_bytes(self):
573
 
        if self.first_bytes is not None:
574
 
            yield self.first_bytes
575
 
            # If we run out of pack records, single the outer layer to stop.
576
 
            self.first_bytes = None
577
 
        for record in self.iter_pack_records:
578
 
            record_names, record_bytes = record
579
 
            record_name, = record_names
580
 
            substream_type = record_name[0]
581
 
            if substream_type != self.current_type:
582
 
                # end of a substream, seed the next substream.
583
 
                self.current_type = substream_type
584
 
                self.first_bytes = record_bytes
585
 
                return
586
 
            yield record_bytes
587
 
 
588
 
    def record_stream(self):
589
 
        """Yield substream_type, substream from the byte stream."""
590
 
        self.seed_state()
591
 
        # Make and consume sub generators, one per substream type:
592
 
        while self.first_bytes is not None:
593
 
            substream = NetworkRecordStream(self.iter_substream_bytes())
594
 
            # after substream is fully consumed, self.current_type is set to
595
 
            # the next type, and self.first_bytes is set to the matching bytes.
596
 
            yield self.current_type, substream.read()
597
 
 
598
 
    def seed_state(self):
599
 
        """Prepare the _ByteStreamDecoder to decode from the pack stream."""
600
 
        # Set a single generator we can use to get data from the pack stream.
601
 
        self.iter_pack_records = self.iter_stream_decoder()
602
 
        # Seed the very first subiterator with content; after this each one
603
 
        # seeds the next.
604
 
        list(self.iter_substream_bytes())
605
 
 
606
 
 
607
 
def _byte_stream_to_stream(byte_stream):
608
 
    """Convert a byte stream into a format and a stream.
609
 
 
610
 
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
611
 
    :return: (RepositoryFormat, stream_generator)
612
 
    """
613
 
    decoder = _ByteStreamDecoder(byte_stream)
614
 
    for bytes in byte_stream:
615
 
        decoder.stream_decoder.accept_bytes(bytes)
616
 
        for record in decoder.stream_decoder.read_pending_records(max=1):
617
 
            record_names, src_format_name = record
618
 
            src_format = network_format_registry.get(src_format_name)
619
 
            return src_format, decoder.record_stream()
620
 
 
621
 
 
622
 
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
623
 
 
624
 
    def do_repository_request(self, repository, token):
625
 
        try:
626
 
            repository.lock_write(token=token)
627
 
        except errors.TokenMismatch, e:
628
 
            return FailedSmartServerResponse(('TokenMismatch',))
629
 
        repository.dont_leave_lock_in_place()
630
 
        repository.unlock()
631
 
        return SuccessfulSmartServerResponse(('ok',))
632
 
 
633
 
 
634
 
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
635
 
 
636
 
    def do_repository_request(self, repository, str_bool_new_value):
637
 
        if str_bool_new_value == 'True':
638
 
            new_value = True
639
 
        else:
640
 
            new_value = False
641
 
        repository.set_make_working_trees(new_value)
642
 
        return SuccessfulSmartServerResponse(('ok',))
643
 
 
644
 
 
645
 
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
646
 
    """Get the raw repository files as a tarball.
647
 
 
648
 
    The returned tarball contains a .bzr control directory which in turn
649
 
    contains a repository.
650
 
 
651
 
    This takes one parameter, compression, which currently must be
652
 
    "", "gz", or "bz2".
653
 
 
654
 
    This is used to implement the Repository.copy_content_into operation.
655
 
    """
656
 
 
657
 
    def do_repository_request(self, repository, compression):
658
 
        tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
659
 
        try:
660
 
            controldir_name = tmp_dirname + '/.bzr'
661
 
            return self._tarfile_response(controldir_name, compression)
662
 
        finally:
663
 
            osutils.rmtree(tmp_dirname)
664
 
 
665
 
    def _copy_to_tempdir(self, from_repo):
666
 
        tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
667
 
        tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
668
 
        tmp_repo = from_repo._format.initialize(tmp_bzrdir)
669
 
        from_repo.copy_content_into(tmp_repo)
670
 
        return tmp_dirname, tmp_repo
671
 
 
672
 
    def _tarfile_response(self, tmp_dirname, compression):
673
 
        temp = tempfile.NamedTemporaryFile()
674
 
        try:
675
 
            self._tarball_of_dir(tmp_dirname, compression, temp.file)
676
 
            # all finished; write the tempfile out to the network
677
 
            temp.seek(0)
678
 
            return SuccessfulSmartServerResponse(('ok',), temp.read())
679
 
            # FIXME: Don't read the whole thing into memory here; rather stream
680
 
            # it out from the file onto the network. mbp 20070411
681
 
        finally:
682
 
            temp.close()
683
 
 
684
 
    def _tarball_of_dir(self, dirname, compression, ofile):
685
 
        import tarfile
686
 
        filename = os.path.basename(ofile.name)
687
 
        tarball = tarfile.open(fileobj=ofile, name=filename,
688
 
            mode='w|' + compression)
689
 
        try:
690
 
            # The tarball module only accepts ascii names, and (i guess)
691
 
            # packs them with their 8bit names.  We know all the files
692
 
            # within the repository have ASCII names so the should be safe
693
 
            # to pack in.
694
 
            dirname = dirname.encode(sys.getfilesystemencoding())
695
 
            # python's tarball module includes the whole path by default so
696
 
            # override it
697
 
            if not dirname.endswith('.bzr'):
698
 
                raise ValueError(dirname)
699
 
            tarball.add(dirname, '.bzr') # recursive by default
700
 
        finally:
701
 
            tarball.close()
702
 
 
703
 
 
704
 
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
705
 
    """Insert a record stream from a RemoteSink into a repository.
706
 
 
707
 
    This gets bytes pushed to it by the network infrastructure and turns that
708
 
    into a bytes iterator using a thread. That is then processed by
709
 
    _byte_stream_to_stream.
710
 
 
711
 
    New in 1.14.
712
 
    """
713
 
 
714
 
    def do_repository_request(self, repository, resume_tokens, lock_token):
715
 
        """StreamSink.insert_stream for a remote repository."""
716
 
        repository.lock_write(token=lock_token)
717
 
        self.do_insert_stream_request(repository, resume_tokens)
718
 
 
719
 
    def do_insert_stream_request(self, repository, resume_tokens):
720
 
        tokens = [token for token in resume_tokens.split(' ') if token]
721
 
        self.tokens = tokens
722
 
        self.repository = repository
723
 
        self.queue = Queue.Queue()
724
 
        self.insert_thread = threading.Thread(target=self._inserter_thread)
725
 
        self.insert_thread.start()
726
 
 
727
 
    def do_chunk(self, body_stream_chunk):
728
 
        self.queue.put(body_stream_chunk)
729
 
 
730
 
    def _inserter_thread(self):
731
 
        try:
732
 
            src_format, stream = _byte_stream_to_stream(
733
 
                self.blocking_byte_stream())
734
 
            self.insert_result = self.repository._get_sink().insert_stream(
735
 
                stream, src_format, self.tokens)
736
 
            self.insert_ok = True
737
 
        except:
738
 
            self.insert_exception = sys.exc_info()
739
 
            self.insert_ok = False
740
 
 
741
 
    def blocking_byte_stream(self):
742
 
        while True:
743
 
            bytes = self.queue.get()
744
 
            if bytes is StopIteration:
745
 
                return
746
 
            else:
747
 
                yield bytes
748
 
 
749
 
    def do_end(self):
750
 
        self.queue.put(StopIteration)
751
 
        if self.insert_thread is not None:
752
 
            self.insert_thread.join()
753
 
        if not self.insert_ok:
754
 
            exc_info = self.insert_exception
755
 
            raise exc_info[0], exc_info[1], exc_info[2]
756
 
        write_group_tokens, missing_keys = self.insert_result
757
 
        if write_group_tokens or missing_keys:
758
 
            # bzip needed? missing keys should typically be a small set.
759
 
            # Should this be a streaming body response ?
760
 
            missing_keys = sorted(missing_keys)
761
 
            bytes = bencode.bencode((write_group_tokens, missing_keys))
762
 
            self.repository.unlock()
763
 
            return SuccessfulSmartServerResponse(('missing-basis', bytes))
764
 
        else:
765
 
            self.repository.unlock()
766
 
            return SuccessfulSmartServerResponse(('ok', ))
767
 
 
768
 
 
769
 
class SmartServerRepositoryInsertStream_1_19(SmartServerRepositoryInsertStreamLocked):
770
 
    """Insert a record stream from a RemoteSink into a repository.
771
 
 
772
 
    Same as SmartServerRepositoryInsertStreamLocked, except:
773
 
     - the lock token argument is optional
774
 
     - servers that implement this verb accept 'inventory-delta' records in the
775
 
       stream.
776
 
 
777
 
    New in 1.19.
778
 
    """
779
 
 
780
 
    def do_repository_request(self, repository, resume_tokens, lock_token=None):
781
 
        """StreamSink.insert_stream for a remote repository."""
782
 
        SmartServerRepositoryInsertStreamLocked.do_repository_request(
783
 
            self, repository, resume_tokens, lock_token)
784
 
 
785
 
 
786
 
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
787
 
    """Insert a record stream from a RemoteSink into an unlocked repository.
788
 
 
789
 
    This is the same as SmartServerRepositoryInsertStreamLocked, except it
790
 
    takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
791
 
    like pack format) repository.
792
 
 
793
 
    New in 1.13.
794
 
    """
795
 
 
796
 
    def do_repository_request(self, repository, resume_tokens):
797
 
        """StreamSink.insert_stream for a remote repository."""
798
 
        repository.lock_write()
799
 
        self.do_insert_stream_request(repository, resume_tokens)
800
 
 
801