~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Andrew Bennetts
  • Date: 2008-03-27 06:10:18 UTC
  • mfrom: (3309 +trunk)
  • mto: This revision was merged to the branch mainline in revision 3320.
  • Revision ID: andrew.bennetts@canonical.com-20080327061018-dxztpxyv6yoeg3am
Merge from bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Server-side repository related request implmentations."""
18
18
 
 
19
import bz2
19
20
from cStringIO import StringIO
20
21
import os
21
22
import sys
30
31
    SmartServerRequest,
31
32
    SuccessfulSmartServerResponse,
32
33
    )
 
34
from bzrlib import revision as _mod_revision
33
35
 
34
36
 
35
37
class SmartServerRepositoryRequest(SmartServerRequest):
50
52
        """
51
53
        transport = self.transport_from_client_path(path)
52
54
        bzrdir = BzrDir.open_from_transport(transport)
53
 
        repository = bzrdir.open_repository()
54
 
        return self.do_repository_request(repository, *args)
 
55
        # Save the repository for use with do_body.
 
56
        self._repository = bzrdir.open_repository()
 
57
        return self.do_repository_request(self._repository, *args)
 
58
 
 
59
    def do_repository_request(self, repository, *args):
 
60
        """Override to provide an implementation for a verb."""
 
61
        # No-op for verbs that take bodies (None as a result indicates a body
 
62
        # is expected)
 
63
        return None
 
64
 
 
65
    def recreate_search(self, repository, recipe_bytes):
 
66
        lines = recipe_bytes.split('\n')
 
67
        start_keys = set(lines[0].split(' '))
 
68
        exclude_keys = set(lines[1].split(' '))
 
69
        revision_count = int(lines[2])
 
70
        repository.lock_read()
 
71
        try:
 
72
            search = repository.get_graph()._make_breadth_first_searcher(
 
73
                start_keys)
 
74
            while True:
 
75
                try:
 
76
                    next_revs = search.next()
 
77
                except StopIteration:
 
78
                    break
 
79
                search.stop_searching_any(exclude_keys.intersection(next_revs))
 
80
            search_result = search.get_result()
 
81
            if search_result.get_recipe()[2] != revision_count:
 
82
                # we got back a different amount of data than expected, this
 
83
                # gets reported as NoSuchRevision, because less revisions
 
84
                # indicates missing revisions, and more should never happen as
 
85
                # the excludes list considers ghosts and ensures that ghost
 
86
                # filling races are not a problem.
 
87
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
 
88
            return (search, None)
 
89
        finally:
 
90
            repository.unlock()
 
91
 
 
92
 
 
93
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
 
94
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
 
95
    
 
96
    def do_repository_request(self, repository, *revision_ids):
 
97
        """Get parent details for some revisions.
 
98
        
 
99
        All the parents for revision_ids are returned. Additionally up to 64KB
 
100
        of additional parent data found by performing a breadth first search
 
101
        from revision_ids is returned. The verb takes a body containing the
 
102
        current search state, see do_body for details.
 
103
 
 
104
        :param repository: The repository to query in.
 
105
        :param revision_ids: The utf8 encoded revision_id to answer for.
 
106
        """
 
107
        self._revision_ids = revision_ids
 
108
        return None # Signal that we want a body.
 
109
 
 
110
    def do_body(self, body_bytes):
 
111
        """Process the current search state and perform the parent lookup.
 
112
 
 
113
        :return: A smart server response where the body contains an utf8
 
114
            encoded flattened list of the parents of the revisions (the same
 
115
            format as Repository.get_revision_graph) which has been bz2
 
116
            compressed.
 
117
        """
 
118
        repository = self._repository
 
119
        repository.lock_read()
 
120
        try:
 
121
            return self._do_repository_request(body_bytes)
 
122
        finally:
 
123
            repository.unlock()
 
124
 
 
125
    def _do_repository_request(self, body_bytes):
 
126
        repository = self._repository
 
127
        revision_ids = set(self._revision_ids)
 
128
        search, error = self.recreate_search(repository, body_bytes)
 
129
        if error is not None:
 
130
            return error
 
131
        # TODO might be nice to start up the search again; but thats not
 
132
        # written or tested yet.
 
133
        client_seen_revs = set(search.get_result().get_keys())
 
134
        # Always include the requested ids.
 
135
        client_seen_revs.difference_update(revision_ids)
 
136
        lines = []
 
137
        repo_graph = repository.get_graph()
 
138
        result = {}
 
139
        queried_revs = set()
 
140
        size_so_far = 0
 
141
        next_revs = revision_ids
 
142
        first_loop_done = False
 
143
        while next_revs:
 
144
            queried_revs.update(next_revs)
 
145
            parent_map = repo_graph.get_parent_map(next_revs)
 
146
            next_revs = set()
 
147
            for revision_id, parents in parent_map.iteritems():
 
148
                # adjust for the wire
 
149
                if parents == (_mod_revision.NULL_REVISION,):
 
150
                    parents = ()
 
151
                # prepare the next query
 
152
                next_revs.update(parents)
 
153
                if revision_id not in client_seen_revs:
 
154
                    # Client does not have this revision, give it to it.
 
155
                    # add parents to the result
 
156
                    result[revision_id] = parents
 
157
                    # Approximate the serialized cost of this revision_id.
 
158
                    size_so_far += 2 + len(revision_id) + sum(map(len, parents))
 
159
            # get all the directly asked for parents, and then flesh out to
 
160
            # 64K (compressed) or so. We do one level of depth at a time to
 
161
            # stay in sync with the client. The 250000 magic number is
 
162
            # estimated compression ratio taken from bzr.dev itself.
 
163
            if first_loop_done and size_so_far > 250000:
 
164
                next_revs = set()
 
165
                break
 
166
            # don't query things we've already queried
 
167
            next_revs.difference_update(queried_revs)
 
168
            first_loop_done = True
 
169
 
 
170
        # sorting trivially puts lexographically similar revision ids together.
 
171
        # Compression FTW.
 
172
        for revision, parents in sorted(result.items()):
 
173
            lines.append(' '.join((revision, ) + tuple(parents)))
 
174
 
 
175
        return SuccessfulSmartServerResponse(
 
176
            ('ok', ), bz2.compress('\n'.join(lines)))
55
177
 
56
178
 
57
179
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryRequest):
251
373
 
252
374
 
253
375
class SmartServerRepositoryStreamKnitDataForRevisions(SmartServerRepositoryRequest):
 
376
    """Bzr <= 1.1 streaming pull, buffers all data on server."""
254
377
 
255
378
    def do_repository_request(self, repository, *revision_ids):
256
379
        repository.lock_read()
260
383
            repository.unlock()
261
384
 
262
385
    def _do_repository_request(self, repository, revision_ids):
263
 
        stream = repository.get_data_stream(revision_ids)
 
386
        stream = repository.get_data_stream_for_search(
 
387
            repository.revision_ids_to_search_result(set(revision_ids)))
264
388
        buffer = StringIO()
265
389
        pack = ContainerSerialiser()
266
390
        buffer.write(pack.begin())
269
393
                for name_tuple, bytes in stream:
270
394
                    buffer.write(pack.bytes_record(bytes, [name_tuple]))
271
395
            except:
272
 
                # Undo the lock_read that that happens once the iterator from
 
396
                # Undo the lock_read that happens once the iterator from
273
397
                # get_data_stream is started.
274
398
                repository.unlock()
275
399
                raise
280
404
 
281
405
 
282
406
class SmartServerRepositoryStreamRevisionsChunked(SmartServerRepositoryRequest):
 
407
    """Bzr 1.1+ streaming pull."""
283
408
 
284
 
    def do_repository_request(self, repository, *revision_ids):
 
409
    def do_body(self, body_bytes):
 
410
        repository = self._repository
285
411
        repository.lock_read()
286
412
        try:
287
 
            stream = repository.get_data_stream(revision_ids)
 
413
            search, error = self.recreate_search(repository, body_bytes)
 
414
            if error is not None:
 
415
                repository.unlock()
 
416
                return error
 
417
            stream = repository.get_data_stream_for_search(search.get_result())
288
418
        except Exception:
 
419
            # On non-error, unlocking is done by the body stream handler.
289
420
            repository.unlock()
290
421
            raise
291
422
        return SuccessfulSmartServerResponse(('ok',),
304
435
                repository.unlock()
305
436
                raise
306
437
        except errors.RevisionNotPresent, e:
 
438
            # This shouldn't be able to happen, but as we don't buffer
 
439
            # everything it can in theory happen.
307
440
            repository.unlock()
308
441
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
309
442
        else: