~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Andrew Bennetts
  • Date: 2009-07-27 05:35:00 UTC
  • mfrom: (4570 +trunk)
  • mto: (4634.6.29 2.0)
  • mto: This revision was merged to the branch mainline in revision 4680.
  • Revision ID: andrew.bennetts@canonical.com-20090727053500-q76zsn2dx33jhmj5
Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Server-side repository related request implmentations."""
18
18
 
19
19
import bz2
20
20
import os
21
21
import Queue
22
 
import struct
23
22
import sys
24
23
import tarfile
25
24
import tempfile
26
25
import threading
27
26
 
28
27
from bzrlib import (
 
28
    bencode,
29
29
    errors,
 
30
    graph,
30
31
    osutils,
31
32
    pack,
32
33
    )
38
39
    )
39
40
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
40
41
from bzrlib import revision as _mod_revision
41
 
from bzrlib.util import bencode
42
42
from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
43
43
 
44
44
 
70
70
        # is expected)
71
71
        return None
72
72
 
73
 
    def recreate_search(self, repository, recipe_bytes):
74
 
        lines = recipe_bytes.split('\n')
 
73
    def recreate_search(self, repository, search_bytes, discard_excess=False):
 
74
        """Recreate a search from its serialised form.
 
75
 
 
76
        :param discard_excess: If True, and the search refers to data we don't
 
77
            have, just silently accept that fact - the verb calling
 
78
            recreate_search trusts that clients will look for missing things
 
79
            they expected and get it from elsewhere.
 
80
        """
 
81
        lines = search_bytes.split('\n')
 
82
        if lines[0] == 'ancestry-of':
 
83
            heads = lines[1:]
 
84
            search_result = graph.PendingAncestryResult(heads, repository)
 
85
            return search_result, None
 
86
        elif lines[0] == 'search':
 
87
            return self.recreate_search_from_recipe(repository, lines[1:],
 
88
                discard_excess=discard_excess)
 
89
        else:
 
90
            return (None, FailedSmartServerResponse(('BadSearch',)))
 
91
 
 
92
    def recreate_search_from_recipe(self, repository, lines,
 
93
        discard_excess=False):
 
94
        """Recreate a specific revision search (vs a from-tip search).
 
95
 
 
96
        :param discard_excess: If True, and the search refers to data we don't
 
97
            have, just silently accept that fact - the verb calling
 
98
            recreate_search trusts that clients will look for missing things
 
99
            they expected and get it from elsewhere.
 
100
        """
75
101
        start_keys = set(lines[0].split(' '))
76
102
        exclude_keys = set(lines[1].split(' '))
77
103
        revision_count = int(lines[2])
86
112
                    break
87
113
                search.stop_searching_any(exclude_keys.intersection(next_revs))
88
114
            search_result = search.get_result()
89
 
            if search_result.get_recipe()[2] != revision_count:
 
115
            if (not discard_excess and
 
116
                search_result.get_recipe()[3] != revision_count):
90
117
                # we got back a different amount of data than expected, this
91
118
                # gets reported as NoSuchRevision, because less revisions
92
119
                # indicates missing revisions, and more should never happen as
93
120
                # the excludes list considers ghosts and ensures that ghost
94
121
                # filling races are not a problem.
95
122
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
96
 
            return (search, None)
 
123
            return (search_result, None)
97
124
        finally:
98
125
            repository.unlock()
99
126
 
123
150
        from revision_ids is returned. The verb takes a body containing the
124
151
        current search state, see do_body for details.
125
152
 
 
153
        If 'include-missing:' is in revision_ids, ghosts encountered in the
 
154
        graph traversal for getting parent data are included in the result with
 
155
        a prefix of 'missing:'.
 
156
 
126
157
        :param repository: The repository to query in.
127
158
        :param revision_ids: The utf8 encoded revision_id to answer for.
128
159
        """
147
178
    def _do_repository_request(self, body_bytes):
148
179
        repository = self._repository
149
180
        revision_ids = set(self._revision_ids)
150
 
        search, error = self.recreate_search(repository, body_bytes)
 
181
        include_missing = 'include-missing:' in revision_ids
 
182
        if include_missing:
 
183
            revision_ids.remove('include-missing:')
 
184
        body_lines = body_bytes.split('\n')
 
185
        search_result, error = self.recreate_search_from_recipe(
 
186
            repository, body_lines)
151
187
        if error is not None:
152
188
            return error
153
189
        # TODO might be nice to start up the search again; but thats not
154
190
        # written or tested yet.
155
 
        client_seen_revs = set(search.get_result().get_keys())
 
191
        client_seen_revs = set(search_result.get_keys())
156
192
        # Always include the requested ids.
157
193
        client_seen_revs.difference_update(revision_ids)
158
194
        lines = []
165
201
        while next_revs:
166
202
            queried_revs.update(next_revs)
167
203
            parent_map = repo_graph.get_parent_map(next_revs)
 
204
            current_revs = next_revs
168
205
            next_revs = set()
169
 
            for revision_id, parents in parent_map.iteritems():
170
 
                # adjust for the wire
171
 
                if parents == (_mod_revision.NULL_REVISION,):
172
 
                    parents = ()
173
 
                # prepare the next query
174
 
                next_revs.update(parents)
175
 
                if revision_id not in client_seen_revs:
 
206
            for revision_id in current_revs:
 
207
                missing_rev = False
 
208
                parents = parent_map.get(revision_id)
 
209
                if parents is not None:
 
210
                    # adjust for the wire
 
211
                    if parents == (_mod_revision.NULL_REVISION,):
 
212
                        parents = ()
 
213
                    # prepare the next query
 
214
                    next_revs.update(parents)
 
215
                    encoded_id = revision_id
 
216
                else:
 
217
                    missing_rev = True
 
218
                    encoded_id = "missing:" + revision_id
 
219
                    parents = []
 
220
                if (revision_id not in client_seen_revs and
 
221
                    (not missing_rev or include_missing)):
176
222
                    # Client does not have this revision, give it to it.
177
223
                    # add parents to the result
178
 
                    result[revision_id] = parents
 
224
                    result[encoded_id] = parents
179
225
                    # Approximate the serialized cost of this revision_id.
180
 
                    size_so_far += 2 + len(revision_id) + sum(map(len, parents))
 
226
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
181
227
            # get all the directly asked for parents, and then flesh out to
182
228
            # 64K (compressed) or so. We do one level of depth at a time to
183
229
            # stay in sync with the client. The 250000 magic number is
237
283
        return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
238
284
 
239
285
 
 
286
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
 
287
 
 
288
    def do_readlocked_repository_request(self, repository, revno,
 
289
            known_pair):
 
290
        """Find the revid for a given revno, given a known revno/revid pair.
 
291
        
 
292
        New in 1.17.
 
293
        """
 
294
        try:
 
295
            found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
 
296
        except errors.RevisionNotPresent, err:
 
297
            if err.revision_id != known_pair[1]:
 
298
                raise AssertionError(
 
299
                    'get_rev_id_for_revno raised RevisionNotPresent for '
 
300
                    'non-initial revision: ' + err.revision_id)
 
301
            return FailedSmartServerResponse(
 
302
                ('nosuchrevision', err.revision_id))
 
303
        if found_flag:
 
304
            return SuccessfulSmartServerResponse(('ok', result))
 
305
        else:
 
306
            earliest_revno, earliest_revid = result
 
307
            return SuccessfulSmartServerResponse(
 
308
                ('history-incomplete', earliest_revno, earliest_revid))
 
309
 
 
310
 
240
311
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
241
312
 
242
313
    def do_repository_request(self, repository, revision_id):
349
420
        repository = self._repository
350
421
        repository.lock_read()
351
422
        try:
352
 
            search, error = self.recreate_search(repository, body_bytes)
 
423
            search_result, error = self.recreate_search(repository, body_bytes,
 
424
                discard_excess=True)
353
425
            if error is not None:
354
426
                repository.unlock()
355
427
                return error
356
 
            search = search.get_result()
357
428
            source = repository._get_source(self._to_format)
358
 
            stream = source.get_stream(search)
 
429
            stream = source.get_stream(search_result)
359
430
        except Exception:
360
431
            exc_info = sys.exc_info()
361
432
            try:
389
460
        for record in substream:
390
461
            if record.storage_kind in ('chunked', 'fulltext'):
391
462
                serialised = record_to_fulltext_bytes(record)
 
463
            elif record.storage_kind == 'absent':
 
464
                raise ValueError("Absent factory for %s" % (record.key,))
392
465
            else:
393
466
                serialised = record.get_bytes_as(record.storage_kind)
394
467
            if serialised:
512
585
            tarball.close()
513
586
 
514
587
 
515
 
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
 
588
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
516
589
    """Insert a record stream from a RemoteSink into a repository.
517
590
 
518
591
    This gets bytes pushed to it by the network infrastructure and turns that
519
592
    into a bytes iterator using a thread. That is then processed by
520
593
    _byte_stream_to_stream.
 
594
 
 
595
    New in 1.14.
521
596
    """
522
597
 
523
 
    def do_repository_request(self, repository, resume_tokens):
 
598
    def do_repository_request(self, repository, resume_tokens, lock_token):
524
599
        """StreamSink.insert_stream for a remote repository."""
525
 
        repository.lock_write()
 
600
        repository.lock_write(token=lock_token)
 
601
        self.do_insert_stream_request(repository, resume_tokens)
 
602
 
 
603
    def do_insert_stream_request(self, repository, resume_tokens):
526
604
        tokens = [token for token in resume_tokens.split(' ') if token]
527
605
        self.tokens = tokens
528
606
        self.repository = repository
570
648
        else:
571
649
            self.repository.unlock()
572
650
            return SuccessfulSmartServerResponse(('ok', ))
 
651
 
 
652
 
 
653
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
 
654
    """Insert a record stream from a RemoteSink into an unlocked repository.
 
655
 
 
656
    This is the same as SmartServerRepositoryInsertStreamLocked, except it
 
657
    takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
 
658
    like pack format) repository.
 
659
 
 
660
    New in 1.13.
 
661
    """
 
662
 
 
663
    def do_repository_request(self, repository, resume_tokens):
 
664
        """StreamSink.insert_stream for a remote repository."""
 
665
        repository.lock_write()
 
666
        self.do_insert_stream_request(repository, resume_tokens)
 
667
 
 
668