~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-03-06 06:48:25 UTC
  • mfrom: (4070.8.6 debug-config)
  • Revision ID: pqm@pqm.ubuntu.com-20090306064825-kbpwggw21dygeix6
(mbp) debug_flags configuration option

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Server-side repository related request implmentations."""
18
18
 
27
27
 
28
28
from bzrlib import (
29
29
    errors,
30
 
    graph,
31
30
    osutils,
32
31
    pack,
33
32
    )
71
70
        # is expected)
72
71
        return None
73
72
 
74
 
    def recreate_search(self, repository, search_bytes):
75
 
        lines = search_bytes.split('\n')
76
 
        if lines[0] == 'ancestry-of':
77
 
            heads = lines[1:]
78
 
            search_result = graph.PendingAncestryResult(heads, repository)
79
 
            return search_result, None
80
 
        elif lines[0] == 'search':
81
 
            return self.recreate_search_from_recipe(repository, lines[1:])
82
 
        else:
83
 
            return (None, FailedSmartServerResponse(('BadSearch',)))
84
 
 
85
 
    def recreate_search_from_recipe(self, repository, lines):
 
73
    def recreate_search(self, repository, recipe_bytes):
 
74
        lines = recipe_bytes.split('\n')
86
75
        start_keys = set(lines[0].split(' '))
87
76
        exclude_keys = set(lines[1].split(' '))
88
77
        revision_count = int(lines[2])
97
86
                    break
98
87
                search.stop_searching_any(exclude_keys.intersection(next_revs))
99
88
            search_result = search.get_result()
100
 
            if search_result.get_recipe()[3] != revision_count:
 
89
            if search_result.get_recipe()[2] != revision_count:
101
90
                # we got back a different amount of data than expected, this
102
91
                # gets reported as NoSuchRevision, because less revisions
103
92
                # indicates missing revisions, and more should never happen as
104
93
                # the excludes list considers ghosts and ensures that ghost
105
94
                # filling races are not a problem.
106
95
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
107
 
            return (search_result, None)
 
96
            return (search, None)
108
97
        finally:
109
98
            repository.unlock()
110
99
 
134
123
        from revision_ids is returned. The verb takes a body containing the
135
124
        current search state, see do_body for details.
136
125
 
137
 
        If 'include-missing:' is in revision_ids, ghosts encountered in the
138
 
        graph traversal for getting parent data are included in the result with
139
 
        a prefix of 'missing:'.
140
 
 
141
126
        :param repository: The repository to query in.
142
127
        :param revision_ids: The utf8 encoded revision_id to answer for.
143
128
        """
162
147
    def _do_repository_request(self, body_bytes):
163
148
        repository = self._repository
164
149
        revision_ids = set(self._revision_ids)
165
 
        include_missing = 'include-missing:' in revision_ids
166
 
        if include_missing:
167
 
            revision_ids.remove('include-missing:')
168
 
        body_lines = body_bytes.split('\n')
169
 
        search_result, error = self.recreate_search_from_recipe(
170
 
            repository, body_lines)
 
150
        search, error = self.recreate_search(repository, body_bytes)
171
151
        if error is not None:
172
152
            return error
173
153
        # TODO might be nice to start up the search again; but thats not
174
154
        # written or tested yet.
175
 
        client_seen_revs = set(search_result.get_keys())
 
155
        client_seen_revs = set(search.get_result().get_keys())
176
156
        # Always include the requested ids.
177
157
        client_seen_revs.difference_update(revision_ids)
178
158
        lines = []
185
165
        while next_revs:
186
166
            queried_revs.update(next_revs)
187
167
            parent_map = repo_graph.get_parent_map(next_revs)
188
 
            current_revs = next_revs
189
168
            next_revs = set()
190
 
            for revision_id in current_revs:
191
 
                missing_rev = False
192
 
                parents = parent_map.get(revision_id)
193
 
                if parents is not None:
194
 
                    # adjust for the wire
195
 
                    if parents == (_mod_revision.NULL_REVISION,):
196
 
                        parents = ()
197
 
                    # prepare the next query
198
 
                    next_revs.update(parents)
199
 
                    encoded_id = revision_id
200
 
                else:
201
 
                    missing_rev = True
202
 
                    encoded_id = "missing:" + revision_id
203
 
                    parents = []
204
 
                if (revision_id not in client_seen_revs and
205
 
                    (not missing_rev or include_missing)):
 
169
            for revision_id, parents in parent_map.iteritems():
 
170
                # adjust for the wire
 
171
                if parents == (_mod_revision.NULL_REVISION,):
 
172
                    parents = ()
 
173
                # prepare the next query
 
174
                next_revs.update(parents)
 
175
                if revision_id not in client_seen_revs:
206
176
                    # Client does not have this revision, give it to it.
207
177
                    # add parents to the result
208
 
                    result[encoded_id] = parents
 
178
                    result[revision_id] = parents
209
179
                    # Approximate the serialized cost of this revision_id.
210
 
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
 
180
                    size_so_far += 2 + len(revision_id) + sum(map(len, parents))
211
181
            # get all the directly asked for parents, and then flesh out to
212
182
            # 64K (compressed) or so. We do one level of depth at a time to
213
183
            # stay in sync with the client. The 250000 magic number is
379
349
        repository = self._repository
380
350
        repository.lock_read()
381
351
        try:
382
 
            search_result, error = self.recreate_search(repository, body_bytes)
 
352
            search, error = self.recreate_search(repository, body_bytes)
383
353
            if error is not None:
384
354
                repository.unlock()
385
355
                return error
 
356
            search = search.get_result()
386
357
            source = repository._get_source(self._to_format)
387
 
            stream = source.get_stream(search_result)
 
358
            stream = source.get_stream(search)
388
359
        except Exception:
389
360
            exc_info = sys.exc_info()
390
361
            try:
541
512
            tarball.close()
542
513
 
543
514
 
544
 
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
 
515
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
545
516
    """Insert a record stream from a RemoteSink into a repository.
546
517
 
547
518
    This gets bytes pushed to it by the network infrastructure and turns that
548
519
    into a bytes iterator using a thread. That is then processed by
549
520
    _byte_stream_to_stream.
550
 
 
551
 
    New in 1.14.
552
521
    """
553
522
 
554
 
    def do_repository_request(self, repository, resume_tokens, lock_token):
 
523
    def do_repository_request(self, repository, resume_tokens):
555
524
        """StreamSink.insert_stream for a remote repository."""
556
 
        repository.lock_write(token=lock_token)
557
 
        self.do_insert_stream_request(repository, resume_tokens)
558
 
 
559
 
    def do_insert_stream_request(self, repository, resume_tokens):
 
525
        repository.lock_write()
560
526
        tokens = [token for token in resume_tokens.split(' ') if token]
561
527
        self.tokens = tokens
562
528
        self.repository = repository
604
570
        else:
605
571
            self.repository.unlock()
606
572
            return SuccessfulSmartServerResponse(('ok', ))
607
 
 
608
 
 
609
 
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
610
 
    """Insert a record stream from a RemoteSink into an unlocked repository.
611
 
 
612
 
    This is the same as SmartServerRepositoryInsertStreamLocked, except it
613
 
    takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
614
 
    like pack format) repository.
615
 
 
616
 
    New in 1.13.
617
 
    """
618
 
 
619
 
    def do_repository_request(self, repository, resume_tokens):
620
 
        """StreamSink.insert_stream for a remote repository."""
621
 
        repository.lock_write()
622
 
        self.do_insert_stream_request(repository, resume_tokens)
623
 
 
624