~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Vincent Ladeuil
  • Date: 2009-06-22 12:52:39 UTC
  • mto: (4471.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 4472.
  • Revision ID: v.ladeuil+lp@free.fr-20090622125239-kabo9smxt9c3vnir
Use a consistent scheme for naming pyrex source files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Server-side repository related request implmentations."""
18
18
 
19
19
import bz2
20
 
from cStringIO import StringIO
21
20
import os
 
21
import Queue
22
22
import sys
 
23
import tarfile
23
24
import tempfile
24
 
import tarfile
 
25
import threading
25
26
 
26
 
from bzrlib import errors
 
27
from bzrlib import (
 
28
    bencode,
 
29
    errors,
 
30
    graph,
 
31
    osutils,
 
32
    pack,
 
33
    )
27
34
from bzrlib.bzrdir import BzrDir
28
 
from bzrlib.pack import ContainerSerialiser
29
35
from bzrlib.smart.request import (
30
36
    FailedSmartServerResponse,
31
37
    SmartServerRequest,
32
38
    SuccessfulSmartServerResponse,
33
39
    )
34
 
from bzrlib.repository import _strip_NULL_ghosts
 
40
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
35
41
from bzrlib import revision as _mod_revision
 
42
from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
36
43
 
37
44
 
38
45
class SmartServerRepositoryRequest(SmartServerRequest):
40
47
 
41
48
    def do(self, path, *args):
42
49
        """Execute a repository request.
43
 
        
 
50
 
44
51
        All Repository requests take a path to the repository as their first
45
52
        argument.  The repository must be at the exact path given by the
46
53
        client - no searching is done.
63
70
        # is expected)
64
71
        return None
65
72
 
66
 
    def recreate_search(self, repository, recipe_bytes):
67
 
        lines = recipe_bytes.split('\n')
 
73
    def recreate_search(self, repository, search_bytes, discard_excess=False):
 
74
        """Recreate a search from its serialised form.
 
75
 
 
76
        :param discard_excess: If True, and the search refers to data we don't
 
77
            have, just silently accept that fact - the verb calling
 
78
            recreate_search trusts that clients will look for missing things
 
79
            they expected and get it from elsewhere.
 
80
        """
 
81
        lines = search_bytes.split('\n')
 
82
        if lines[0] == 'ancestry-of':
 
83
            heads = lines[1:]
 
84
            search_result = graph.PendingAncestryResult(heads, repository)
 
85
            return search_result, None
 
86
        elif lines[0] == 'search':
 
87
            return self.recreate_search_from_recipe(repository, lines[1:],
 
88
                discard_excess=discard_excess)
 
89
        else:
 
90
            return (None, FailedSmartServerResponse(('BadSearch',)))
 
91
 
 
92
    def recreate_search_from_recipe(self, repository, lines,
 
93
        discard_excess=False):
 
94
        """Recreate a specific revision search (vs a from-tip search).
 
95
 
 
96
        :param discard_excess: If True, and the search refers to data we don't
 
97
            have, just silently accept that fact - the verb calling
 
98
            recreate_search trusts that clients will look for missing things
 
99
            they expected and get it from elsewhere.
 
100
        """
68
101
        start_keys = set(lines[0].split(' '))
69
102
        exclude_keys = set(lines[1].split(' '))
70
103
        revision_count = int(lines[2])
79
112
                    break
80
113
                search.stop_searching_any(exclude_keys.intersection(next_revs))
81
114
            search_result = search.get_result()
82
 
            if search_result.get_recipe()[2] != revision_count:
 
115
            if (not discard_excess and
 
116
                search_result.get_recipe()[3] != revision_count):
83
117
                # we got back a different amount of data than expected, this
84
118
                # gets reported as NoSuchRevision, because less revisions
85
119
                # indicates missing revisions, and more should never happen as
86
120
                # the excludes list considers ghosts and ensures that ghost
87
121
                # filling races are not a problem.
88
122
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
89
 
            return (search, None)
 
123
            return (search_result, None)
90
124
        finally:
91
125
            repository.unlock()
92
126
 
105
139
 
106
140
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
107
141
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
108
 
    
 
142
 
 
143
    no_extra_results = False
 
144
 
109
145
    def do_repository_request(self, repository, *revision_ids):
110
146
        """Get parent details for some revisions.
111
 
        
 
147
 
112
148
        All the parents for revision_ids are returned. Additionally up to 64KB
113
149
        of additional parent data found by performing a breadth first search
114
150
        from revision_ids is returned. The verb takes a body containing the
115
151
        current search state, see do_body for details.
116
152
 
 
153
        If 'include-missing:' is in revision_ids, ghosts encountered in the
 
154
        graph traversal for getting parent data are included in the result with
 
155
        a prefix of 'missing:'.
 
156
 
117
157
        :param repository: The repository to query in.
118
158
        :param revision_ids: The utf8 encoded revision_id to answer for.
119
159
        """
138
178
    def _do_repository_request(self, body_bytes):
139
179
        repository = self._repository
140
180
        revision_ids = set(self._revision_ids)
141
 
        search, error = self.recreate_search(repository, body_bytes)
 
181
        include_missing = 'include-missing:' in revision_ids
 
182
        if include_missing:
 
183
            revision_ids.remove('include-missing:')
 
184
        body_lines = body_bytes.split('\n')
 
185
        search_result, error = self.recreate_search_from_recipe(
 
186
            repository, body_lines)
142
187
        if error is not None:
143
188
            return error
144
189
        # TODO might be nice to start up the search again; but thats not
145
190
        # written or tested yet.
146
 
        client_seen_revs = set(search.get_result().get_keys())
 
191
        client_seen_revs = set(search_result.get_keys())
147
192
        # Always include the requested ids.
148
193
        client_seen_revs.difference_update(revision_ids)
149
194
        lines = []
156
201
        while next_revs:
157
202
            queried_revs.update(next_revs)
158
203
            parent_map = repo_graph.get_parent_map(next_revs)
 
204
            current_revs = next_revs
159
205
            next_revs = set()
160
 
            for revision_id, parents in parent_map.iteritems():
161
 
                # adjust for the wire
162
 
                if parents == (_mod_revision.NULL_REVISION,):
163
 
                    parents = ()
164
 
                # prepare the next query
165
 
                next_revs.update(parents)
166
 
                if revision_id not in client_seen_revs:
 
206
            for revision_id in current_revs:
 
207
                missing_rev = False
 
208
                parents = parent_map.get(revision_id)
 
209
                if parents is not None:
 
210
                    # adjust for the wire
 
211
                    if parents == (_mod_revision.NULL_REVISION,):
 
212
                        parents = ()
 
213
                    # prepare the next query
 
214
                    next_revs.update(parents)
 
215
                    encoded_id = revision_id
 
216
                else:
 
217
                    missing_rev = True
 
218
                    encoded_id = "missing:" + revision_id
 
219
                    parents = []
 
220
                if (revision_id not in client_seen_revs and
 
221
                    (not missing_rev or include_missing)):
167
222
                    # Client does not have this revision, give it to it.
168
223
                    # add parents to the result
169
 
                    result[revision_id] = parents
 
224
                    result[encoded_id] = parents
170
225
                    # Approximate the serialized cost of this revision_id.
171
 
                    size_so_far += 2 + len(revision_id) + sum(map(len, parents))
 
226
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
172
227
            # get all the directly asked for parents, and then flesh out to
173
228
            # 64K (compressed) or so. We do one level of depth at a time to
174
229
            # stay in sync with the client. The 250000 magic number is
175
230
            # estimated compression ratio taken from bzr.dev itself.
176
 
            if first_loop_done and size_so_far > 250000:
 
231
            if self.no_extra_results or (
 
232
                first_loop_done and size_so_far > 250000):
177
233
                next_revs = set()
178
234
                break
179
235
            # don't query things we've already queried
190
246
 
191
247
 
192
248
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
193
 
    
 
249
 
194
250
    def do_readlocked_repository_request(self, repository, revision_id):
195
251
        """Return the result of repository.get_revision_graph(revision_id).
196
252
 
197
253
        Deprecated as of bzr 1.4, but supported for older clients.
198
 
        
 
254
 
199
255
        :param repository: The repository to query in.
200
256
        :param revision_id: The utf8 encoded revision_id to get a graph from.
201
257
        :return: A smart server response where the body contains an utf8
227
283
        return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
228
284
 
229
285
 
230
 
class SmartServerRepositoryGraphHeads(SmartServerRepositoryRequest):
 
286
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
231
287
 
232
 
    def do_repository_request(self, repository, *keys):
233
 
        repository.lock_read()
 
288
    def do_readlocked_repository_request(self, repository, revno,
 
289
            known_pair):
 
290
        """Find the revid for a given revno, given a known revno/revid pair.
 
291
        
 
292
        New in 1.17.
 
293
        """
234
294
        try:
235
 
            graph = repository.get_graph()
236
 
            heads = tuple(graph.heads(keys))
237
 
        finally:
238
 
            repository.unlock()
239
 
        return SuccessfulSmartServerResponse(heads)
 
295
            found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
 
296
        except errors.RevisionNotPresent, err:
 
297
            if err.revision_id != known_pair[1]:
 
298
                raise AssertionError(
 
299
                    'get_rev_id_for_revno raised RevisionNotPresent for '
 
300
                    'non-initial revision: ' + err.revision_id)
 
301
            return FailedSmartServerResponse(
 
302
                ('nosuchrevision', err.revision_id))
 
303
        if found_flag:
 
304
            return SuccessfulSmartServerResponse(('ok', result))
 
305
        else:
 
306
            earliest_revno, earliest_revid = result
 
307
            return SuccessfulSmartServerResponse(
 
308
                ('history-incomplete', earliest_revno, earliest_revid))
240
309
 
241
310
 
242
311
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
269
338
              firstrev: 1234.230 0
270
339
              latestrev: 345.700 3600
271
340
              revisions: 2
272
 
              size:45
273
341
 
274
342
              But containing only fields returned by the gather_stats() call
275
343
        """
336
404
        return SuccessfulSmartServerResponse(('ok', token))
337
405
 
338
406
 
 
407
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
 
408
 
 
409
    def do_repository_request(self, repository, to_network_name):
 
410
        """Get a stream for inserting into a to_format repository.
 
411
 
 
412
        :param repository: The repository to stream from.
 
413
        :param to_network_name: The network name of the format of the target
 
414
            repository.
 
415
        """
 
416
        self._to_format = network_format_registry.get(to_network_name)
 
417
        return None # Signal that we want a body.
 
418
 
 
419
    def do_body(self, body_bytes):
 
420
        repository = self._repository
 
421
        repository.lock_read()
 
422
        try:
 
423
            search_result, error = self.recreate_search(repository, body_bytes,
 
424
                discard_excess=True)
 
425
            if error is not None:
 
426
                repository.unlock()
 
427
                return error
 
428
            source = repository._get_source(self._to_format)
 
429
            stream = source.get_stream(search_result)
 
430
        except Exception:
 
431
            exc_info = sys.exc_info()
 
432
            try:
 
433
                # On non-error, unlocking is done by the body stream handler.
 
434
                repository.unlock()
 
435
            finally:
 
436
                raise exc_info[0], exc_info[1], exc_info[2]
 
437
        return SuccessfulSmartServerResponse(('ok',),
 
438
            body_stream=self.body_stream(stream, repository))
 
439
 
 
440
    def body_stream(self, stream, repository):
 
441
        byte_stream = _stream_to_byte_stream(stream, repository._format)
 
442
        try:
 
443
            for bytes in byte_stream:
 
444
                yield bytes
 
445
        except errors.RevisionNotPresent, e:
 
446
            # This shouldn't be able to happen, but as we don't buffer
 
447
            # everything it can in theory happen.
 
448
            repository.unlock()
 
449
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
 
450
        else:
 
451
            repository.unlock()
 
452
 
 
453
 
 
454
def _stream_to_byte_stream(stream, src_format):
 
455
    """Convert a record stream to a self delimited byte stream."""
 
456
    pack_writer = pack.ContainerSerialiser()
 
457
    yield pack_writer.begin()
 
458
    yield pack_writer.bytes_record(src_format.network_name(), '')
 
459
    for substream_type, substream in stream:
 
460
        for record in substream:
 
461
            if record.storage_kind in ('chunked', 'fulltext'):
 
462
                serialised = record_to_fulltext_bytes(record)
 
463
            elif record.storage_kind == 'absent':
 
464
                raise ValueError("Absent factory for %s" % (record.key,))
 
465
            else:
 
466
                serialised = record.get_bytes_as(record.storage_kind)
 
467
            if serialised:
 
468
                # Some streams embed the whole stream into the wire
 
469
                # representation of the first record, which means that
 
470
                # later records have no wire representation: we skip them.
 
471
                yield pack_writer.bytes_record(serialised, [(substream_type,)])
 
472
    yield pack_writer.end()
 
473
 
 
474
 
 
475
def _byte_stream_to_stream(byte_stream):
 
476
    """Convert a byte stream into a format and a stream.
 
477
 
 
478
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
 
479
    :return: (RepositoryFormat, stream_generator)
 
480
    """
 
481
    stream_decoder = pack.ContainerPushParser()
 
482
    def record_stream():
 
483
        """Closure to return the substreams."""
 
484
        # May have fully parsed records already.
 
485
        for record in stream_decoder.read_pending_records():
 
486
            record_names, record_bytes = record
 
487
            record_name, = record_names
 
488
            substream_type = record_name[0]
 
489
            substream = NetworkRecordStream([record_bytes])
 
490
            yield substream_type, substream.read()
 
491
        for bytes in byte_stream:
 
492
            stream_decoder.accept_bytes(bytes)
 
493
            for record in stream_decoder.read_pending_records():
 
494
                record_names, record_bytes = record
 
495
                record_name, = record_names
 
496
                substream_type = record_name[0]
 
497
                substream = NetworkRecordStream([record_bytes])
 
498
                yield substream_type, substream.read()
 
499
    for bytes in byte_stream:
 
500
        stream_decoder.accept_bytes(bytes)
 
501
        for record in stream_decoder.read_pending_records(max=1):
 
502
            record_names, src_format_name = record
 
503
            src_format = network_format_registry.get(src_format_name)
 
504
            return src_format, record_stream()
 
505
 
 
506
 
339
507
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
340
508
 
341
509
    def do_repository_request(self, repository, token):
348
516
        return SuccessfulSmartServerResponse(('ok',))
349
517
 
350
518
 
 
519
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
 
520
 
 
521
    def do_repository_request(self, repository, str_bool_new_value):
 
522
        if str_bool_new_value == 'True':
 
523
            new_value = True
 
524
        else:
 
525
            new_value = False
 
526
        repository.set_make_working_trees(new_value)
 
527
        return SuccessfulSmartServerResponse(('ok',))
 
528
 
 
529
 
351
530
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
352
531
    """Get the raw repository files as a tarball.
353
532
 
354
533
    The returned tarball contains a .bzr control directory which in turn
355
534
    contains a repository.
356
 
    
357
 
    This takes one parameter, compression, which currently must be 
 
535
 
 
536
    This takes one parameter, compression, which currently must be
358
537
    "", "gz", or "bz2".
359
538
 
360
539
    This is used to implement the Repository.copy_content_into operation.
361
540
    """
362
541
 
363
542
    def do_repository_request(self, repository, compression):
364
 
        from bzrlib import osutils
365
 
        repo_transport = repository.control_files._transport
366
543
        tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
367
544
        try:
368
545
            controldir_name = tmp_dirname + '/.bzr'
371
548
            osutils.rmtree(tmp_dirname)
372
549
 
373
550
    def _copy_to_tempdir(self, from_repo):
374
 
        tmp_dirname = tempfile.mkdtemp(prefix='tmpbzrclone')
 
551
        tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
375
552
        tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
376
553
        tmp_repo = from_repo._format.initialize(tmp_bzrdir)
377
554
        from_repo.copy_content_into(tmp_repo)
384
561
            # all finished; write the tempfile out to the network
385
562
            temp.seek(0)
386
563
            return SuccessfulSmartServerResponse(('ok',), temp.read())
387
 
            # FIXME: Don't read the whole thing into memory here; rather stream it
388
 
            # out from the file onto the network. mbp 20070411
 
564
            # FIXME: Don't read the whole thing into memory here; rather stream
 
565
            # it out from the file onto the network. mbp 20070411
389
566
        finally:
390
567
            temp.close()
391
568
 
408
585
            tarball.close()
409
586
 
410
587
 
411
 
class SmartServerRepositoryStreamKnitDataForRevisions(SmartServerRepositoryRequest):
412
 
    """Bzr <= 1.1 streaming pull, buffers all data on server."""
413
 
 
414
 
    def do_repository_request(self, repository, *revision_ids):
415
 
        repository.lock_read()
416
 
        try:
417
 
            return self._do_repository_request(repository, revision_ids)
418
 
        finally:
419
 
            repository.unlock()
420
 
 
421
 
    def _do_repository_request(self, repository, revision_ids):
422
 
        stream = repository.get_data_stream_for_search(
423
 
            repository.revision_ids_to_search_result(set(revision_ids)))
424
 
        buffer = StringIO()
425
 
        pack = ContainerSerialiser()
426
 
        buffer.write(pack.begin())
427
 
        try:
428
 
            for name_tuple, bytes in stream:
429
 
                buffer.write(pack.bytes_record(bytes, [name_tuple]))
430
 
        except errors.RevisionNotPresent, e:
431
 
            return FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
432
 
        buffer.write(pack.end())
433
 
        return SuccessfulSmartServerResponse(('ok',), buffer.getvalue())
434
 
 
435
 
 
436
 
class SmartServerRepositoryStreamRevisionsChunked(SmartServerRepositoryRequest):
437
 
    """Bzr 1.1+ streaming pull."""
438
 
 
439
 
    def do_body(self, body_bytes):
440
 
        repository = self._repository
441
 
        repository.lock_read()
442
 
        try:
443
 
            search, error = self.recreate_search(repository, body_bytes)
444
 
            if error is not None:
445
 
                repository.unlock()
446
 
                return error
447
 
            stream = repository.get_data_stream_for_search(search.get_result())
448
 
        except Exception:
449
 
            # On non-error, unlocking is done by the body stream handler.
450
 
            repository.unlock()
451
 
            raise
452
 
        return SuccessfulSmartServerResponse(('ok',),
453
 
            body_stream=self.body_stream(stream, repository))
454
 
 
455
 
    def body_stream(self, stream, repository):
456
 
        pack = ContainerSerialiser()
457
 
        yield pack.begin()
458
 
        try:
459
 
            try:
460
 
                for name_tuple, bytes in stream:
461
 
                    yield pack.bytes_record(bytes, [name_tuple])
462
 
            except:
463
 
                # Undo the lock_read that that happens once the iterator from
464
 
                # get_data_stream is started.
465
 
                repository.unlock()
466
 
                raise
467
 
        except errors.RevisionNotPresent, e:
468
 
            # This shouldn't be able to happen, but as we don't buffer
469
 
            # everything it can in theory happen.
470
 
            repository.unlock()
471
 
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
 
588
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
 
589
    """Insert a record stream from a RemoteSink into a repository.
 
590
 
 
591
    This gets bytes pushed to it by the network infrastructure and turns that
 
592
    into a bytes iterator using a thread. That is then processed by
 
593
    _byte_stream_to_stream.
 
594
 
 
595
    New in 1.14.
 
596
    """
 
597
 
 
598
    def do_repository_request(self, repository, resume_tokens, lock_token):
 
599
        """StreamSink.insert_stream for a remote repository."""
 
600
        repository.lock_write(token=lock_token)
 
601
        self.do_insert_stream_request(repository, resume_tokens)
 
602
 
 
603
    def do_insert_stream_request(self, repository, resume_tokens):
 
604
        tokens = [token for token in resume_tokens.split(' ') if token]
 
605
        self.tokens = tokens
 
606
        self.repository = repository
 
607
        self.queue = Queue.Queue()
 
608
        self.insert_thread = threading.Thread(target=self._inserter_thread)
 
609
        self.insert_thread.start()
 
610
 
 
611
    def do_chunk(self, body_stream_chunk):
 
612
        self.queue.put(body_stream_chunk)
 
613
 
 
614
    def _inserter_thread(self):
 
615
        try:
 
616
            src_format, stream = _byte_stream_to_stream(
 
617
                self.blocking_byte_stream())
 
618
            self.insert_result = self.repository._get_sink().insert_stream(
 
619
                stream, src_format, self.tokens)
 
620
            self.insert_ok = True
 
621
        except:
 
622
            self.insert_exception = sys.exc_info()
 
623
            self.insert_ok = False
 
624
 
 
625
    def blocking_byte_stream(self):
 
626
        while True:
 
627
            bytes = self.queue.get()
 
628
            if bytes is StopIteration:
 
629
                return
 
630
            else:
 
631
                yield bytes
 
632
 
 
633
    def do_end(self):
 
634
        self.queue.put(StopIteration)
 
635
        if self.insert_thread is not None:
 
636
            self.insert_thread.join()
 
637
        if not self.insert_ok:
 
638
            exc_info = self.insert_exception
 
639
            raise exc_info[0], exc_info[1], exc_info[2]
 
640
        write_group_tokens, missing_keys = self.insert_result
 
641
        if write_group_tokens or missing_keys:
 
642
            # bzip needed? missing keys should typically be a small set.
 
643
            # Should this be a streaming body response ?
 
644
            missing_keys = sorted(missing_keys)
 
645
            bytes = bencode.bencode((write_group_tokens, missing_keys))
 
646
            self.repository.unlock()
 
647
            return SuccessfulSmartServerResponse(('missing-basis', bytes))
472
648
        else:
473
 
            repository.unlock()
474
 
            pack.end()
 
649
            self.repository.unlock()
 
650
            return SuccessfulSmartServerResponse(('ok', ))
 
651
 
 
652
 
 
653
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
 
654
    """Insert a record stream from a RemoteSink into an unlocked repository.
 
655
 
 
656
    This is the same as SmartServerRepositoryInsertStreamLocked, except it
 
657
    takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
 
658
    like pack format) repository.
 
659
 
 
660
    New in 1.13.
 
661
    """
 
662
 
 
663
    def do_repository_request(self, repository, resume_tokens):
 
664
        """StreamSink.insert_stream for a remote repository."""
 
665
        repository.lock_write()
 
666
        self.do_insert_stream_request(repository, resume_tokens)
 
667
 
475
668