~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Robert Collins
  • Date: 2008-02-13 03:30:01 UTC
  • mfrom: (3221 +trunk)
  • mto: This revision was merged to the branch mainline in revision 3224.
  • Revision ID: robertc@robertcollins.net-20080213033001-rw70ul0zb02ph856
Merge to fix conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Server-side repository related request implmentations."""
18
18
 
 
19
import bz2
19
20
from cStringIO import StringIO
20
21
import os
21
22
import sys
24
25
 
25
26
from bzrlib import errors
26
27
from bzrlib.bzrdir import BzrDir
27
 
from bzrlib.pack import ContainerWriter
 
28
from bzrlib.pack import ContainerSerialiser
28
29
from bzrlib.smart.request import (
29
30
    FailedSmartServerResponse,
30
31
    SmartServerRequest,
31
32
    SuccessfulSmartServerResponse,
32
33
    )
 
34
from bzrlib import revision as _mod_revision
33
35
 
34
36
 
35
37
class SmartServerRepositoryRequest(SmartServerRequest):
47
49
        """
48
50
        transport = self._backing_transport.clone(path)
49
51
        bzrdir = BzrDir.open_from_transport(transport)
50
 
        repository = bzrdir.open_repository()
51
 
        return self.do_repository_request(repository, *args)
 
52
        # Save the repository for use with do_body.
 
53
        self._repository = bzrdir.open_repository()
 
54
        return self.do_repository_request(self._repository, *args)
 
55
 
 
56
    def do_repository_request(self, repository, *args):
 
57
        """Override to provide an implementation for a verb."""
 
58
        # No-op for verbs that take bodies (None as a result indicates a body
 
59
        # is expected)
 
60
        return None
 
61
 
 
62
    def recreate_search(self, repository, recipe_bytes):
 
63
        lines = recipe_bytes.split('\n')
 
64
        start_keys = set(lines[0].split(' '))
 
65
        exclude_keys = set(lines[1].split(' '))
 
66
        revision_count = int(lines[2])
 
67
        repository.lock_read()
 
68
        try:
 
69
            search = repository.get_graph()._make_breadth_first_searcher(
 
70
                start_keys)
 
71
            while True:
 
72
                try:
 
73
                    next_revs = search.next()
 
74
                except StopIteration:
 
75
                    break
 
76
                search.stop_searching_any(exclude_keys.intersection(next_revs))
 
77
            search_result = search.get_result()
 
78
            if search_result.get_recipe()[2] != revision_count:
 
79
                # we got back a different amount of data than expected, this
 
80
                # gets reported as NoSuchRevision, because less revisions
 
81
                # indicates missing revisions, and more should never happen as
 
82
                # the excludes list considers ghosts and ensures that ghost
 
83
                # filling races are not a problem.
 
84
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
 
85
            return (search, None)
 
86
        finally:
 
87
            repository.unlock()
 
88
 
 
89
 
 
90
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
 
91
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
 
92
    
 
93
    def do_repository_request(self, repository, *revision_ids):
 
94
        """Get parent details for some revisions.
 
95
        
 
96
        All the parents for revision_ids are returned. Additionally up to 64KB
 
97
        of additional parent data found by performing a breadth first search
 
98
        from revision_ids is returned. The verb takes a body containing the
 
99
        current search state, see do_body for details.
 
100
 
 
101
        :param repository: The repository to query in.
 
102
        :param revision_ids: The utf8 encoded revision_id to answer for.
 
103
        """
 
104
        self._revision_ids = revision_ids
 
105
        return None # Signal that we want a body.
 
106
 
 
107
    def do_body(self, body_bytes):
 
108
        """Process the current search state and perform the parent lookup.
 
109
 
 
110
        :return: A smart server response where the body contains an utf8
 
111
            encoded flattened list of the parents of the revisions (the same
 
112
            format as Repository.get_revision_graph) which has been bz2
 
113
            compressed.
 
114
        """
 
115
        repository = self._repository
 
116
        repository.lock_read()
 
117
        try:
 
118
            return self._do_repository_request(body_bytes)
 
119
        finally:
 
120
            repository.unlock()
 
121
 
 
122
    def _do_repository_request(self, body_bytes):
 
123
        repository = self._repository
 
124
        revision_ids = set(self._revision_ids)
 
125
        search, error = self.recreate_search(repository, body_bytes)
 
126
        if error is not None:
 
127
            return error
 
128
        # TODO might be nice to start up the search again; but thats not
 
129
        # written or tested yet.
 
130
        client_seen_revs = set(search.get_result().get_keys())
 
131
        # Always include the requested ids.
 
132
        client_seen_revs.difference_update(revision_ids)
 
133
        lines = []
 
134
        repo_graph = repository.get_graph()
 
135
        result = {}
 
136
        queried_revs = set()
 
137
        size_so_far = 0
 
138
        next_revs = revision_ids
 
139
        first_loop_done = False
 
140
        while next_revs:
 
141
            queried_revs.update(next_revs)
 
142
            parent_map = repo_graph.get_parent_map(next_revs)
 
143
            next_revs = set()
 
144
            for revision_id, parents in parent_map.iteritems():
 
145
                # adjust for the wire
 
146
                if parents == (_mod_revision.NULL_REVISION,):
 
147
                    parents = ()
 
148
                # prepare the next query
 
149
                next_revs.update(parents)
 
150
                if revision_id not in client_seen_revs:
 
151
                    # Client does not have this revision, give it to it.
 
152
                    # add parents to the result
 
153
                    result[revision_id] = parents
 
154
                    # Approximate the serialized cost of this revision_id.
 
155
                    size_so_far += 2 + len(revision_id) + sum(map(len, parents))
 
156
            # get all the directly asked for parents, and then flesh out to
 
157
            # 64K (compressed) or so. We do one level of depth at a time to
 
158
            # stay in sync with the client. The 250000 magic number is
 
159
            # estimated compression ratio taken from bzr.dev itself.
 
160
            if first_loop_done and size_so_far > 250000:
 
161
                next_revs = set()
 
162
                break
 
163
            # don't query things we've already queried
 
164
            next_revs.difference_update(queried_revs)
 
165
            first_loop_done = True
 
166
 
 
167
        # sorting trivially puts lexographically similar revision ids together.
 
168
        # Compression FTW.
 
169
        for revision, parents in sorted(result.items()):
 
170
            lines.append(' '.join((revision, ) + tuple(parents)))
 
171
 
 
172
        return SuccessfulSmartServerResponse(
 
173
            ('ok', ), bz2.compress('\n'.join(lines)))
52
174
 
53
175
 
54
176
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryRequest):
168
290
        except errors.LockFailed, e:
169
291
            return FailedSmartServerResponse(('LockFailed',
170
292
                str(e.lock), str(e.why)))
171
 
        repository.leave_lock_in_place()
 
293
        if token is not None:
 
294
            repository.leave_lock_in_place()
172
295
        repository.unlock()
173
296
        if token is None:
174
297
            token = ''
247
370
 
248
371
 
249
372
class SmartServerRepositoryStreamKnitDataForRevisions(SmartServerRepositoryRequest):
 
373
    """Bzr <= 1.1 streaming pull, buffers all data on server."""
250
374
 
251
375
    def do_repository_request(self, repository, *revision_ids):
252
 
        stream = repository.get_data_stream(revision_ids)
253
 
        filelike = StringIO()
254
 
        pack = ContainerWriter(filelike.write)
255
 
        pack.begin()
 
376
        repository.lock_read()
 
377
        try:
 
378
            return self._do_repository_request(repository, revision_ids)
 
379
        finally:
 
380
            repository.unlock()
 
381
 
 
382
    def _do_repository_request(self, repository, revision_ids):
 
383
        stream = repository.get_data_stream_for_search(
 
384
            repository.revision_ids_to_search_result(set(revision_ids)))
 
385
        buffer = StringIO()
 
386
        pack = ContainerSerialiser()
 
387
        buffer.write(pack.begin())
256
388
        try:
257
389
            for name_tuple, bytes in stream:
258
 
                pack.add_bytes_record(bytes, [name_tuple])
 
390
                buffer.write(pack.bytes_record(bytes, [name_tuple]))
259
391
        except errors.RevisionNotPresent, e:
260
392
            return FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
 
393
        buffer.write(pack.end())
 
394
        return SuccessfulSmartServerResponse(('ok',), buffer.getvalue())
 
395
 
 
396
 
 
397
class SmartServerRepositoryStreamRevisionsChunked(SmartServerRepositoryRequest):
 
398
    """Bzr 1.1+ streaming pull."""
 
399
 
 
400
    def do_body(self, body_bytes):
 
401
        repository = self._repository
 
402
        repository.lock_read()
 
403
        try:
 
404
            search, error = self.recreate_search(repository, body_bytes)
 
405
            if error is not None:
 
406
                return error
 
407
            stream = repository.get_data_stream_for_search(search.get_result())
 
408
        except Exception:
 
409
            # On non-error, unlocking is done by the body stream handler.
 
410
            repository.unlock()
 
411
            raise
 
412
        return SuccessfulSmartServerResponse(('ok',),
 
413
            body_stream=self.body_stream(stream, repository))
 
414
 
 
415
    def body_stream(self, stream, repository):
 
416
        pack = ContainerSerialiser()
 
417
        yield pack.begin()
 
418
        try:
 
419
            for name_tuple, bytes in stream:
 
420
                yield pack.bytes_record(bytes, [name_tuple])
 
421
        except errors.RevisionNotPresent, e:
 
422
            # This shouldn't be able to happen, but as we don't buffer
 
423
            # everything it can in theory happen.
 
424
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
 
425
        repository.unlock()
261
426
        pack.end()
262
 
        return SuccessfulSmartServerResponse(('ok',), filelike.getvalue())
263
427