~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Matt Nordhoff
  • Date: 2009-04-04 02:50:01 UTC
  • mfrom: (4253 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4256.
  • Revision ID: mnordhoff@mattnordhoff.com-20090404025001-z1403k0tatmc8l91
Merge bzr.dev, fixing conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Server-side repository related request implmentations."""
18
18
 
19
19
import bz2
20
20
import os
 
21
import Queue
21
22
import struct
22
23
import sys
23
24
import tarfile
24
25
import tempfile
25
26
import threading
26
 
import Queue
27
27
 
28
28
from bzrlib import (
29
29
    errors,
 
30
    graph,
30
31
    osutils,
31
32
    pack,
32
33
    )
39
40
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
40
41
from bzrlib import revision as _mod_revision
41
42
from bzrlib.util import bencode
42
 
from bzrlib.versionedfile import NetworkRecordStream
 
43
from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
43
44
 
44
45
 
45
46
class SmartServerRepositoryRequest(SmartServerRequest):
47
48
 
48
49
    def do(self, path, *args):
49
50
        """Execute a repository request.
50
 
        
 
51
 
51
52
        All Repository requests take a path to the repository as their first
52
53
        argument.  The repository must be at the exact path given by the
53
54
        client - no searching is done.
70
71
        # is expected)
71
72
        return None
72
73
 
73
 
    def recreate_search(self, repository, recipe_bytes):
74
 
        lines = recipe_bytes.split('\n')
 
74
    def recreate_search(self, repository, search_bytes):
 
75
        lines = search_bytes.split('\n')
 
76
        if lines[0] == 'ancestry-of':
 
77
            heads = lines[1:]
 
78
            search_result = graph.PendingAncestryResult(heads, repository)
 
79
            return search_result, None
 
80
        elif lines[0] == 'search':
 
81
            return self.recreate_search_from_recipe(repository, lines[1:])
 
82
        else:
 
83
            return (None, FailedSmartServerResponse(('BadSearch',)))
 
84
 
 
85
    def recreate_search_from_recipe(self, repository, lines):
75
86
        start_keys = set(lines[0].split(' '))
76
87
        exclude_keys = set(lines[1].split(' '))
77
88
        revision_count = int(lines[2])
86
97
                    break
87
98
                search.stop_searching_any(exclude_keys.intersection(next_revs))
88
99
            search_result = search.get_result()
89
 
            if search_result.get_recipe()[2] != revision_count:
 
100
            if search_result.get_recipe()[3] != revision_count:
90
101
                # we got back a different amount of data than expected, this
91
102
                # gets reported as NoSuchRevision, because less revisions
92
103
                # indicates missing revisions, and more should never happen as
93
104
                # the excludes list considers ghosts and ensures that ghost
94
105
                # filling races are not a problem.
95
106
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
96
 
            return (search, None)
 
107
            return (search_result, None)
97
108
        finally:
98
109
            repository.unlock()
99
110
 
112
123
 
113
124
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
114
125
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
115
 
    
 
126
 
 
127
    no_extra_results = False
 
128
 
116
129
    def do_repository_request(self, repository, *revision_ids):
117
130
        """Get parent details for some revisions.
118
 
        
 
131
 
119
132
        All the parents for revision_ids are returned. Additionally up to 64KB
120
133
        of additional parent data found by performing a breadth first search
121
134
        from revision_ids is returned. The verb takes a body containing the
122
135
        current search state, see do_body for details.
123
136
 
 
137
        If 'include-missing:' is in revision_ids, ghosts encountered in the
 
138
        graph traversal for getting parent data are included in the result with
 
139
        a prefix of 'missing:'.
 
140
 
124
141
        :param repository: The repository to query in.
125
142
        :param revision_ids: The utf8 encoded revision_id to answer for.
126
143
        """
145
162
    def _do_repository_request(self, body_bytes):
146
163
        repository = self._repository
147
164
        revision_ids = set(self._revision_ids)
148
 
        search, error = self.recreate_search(repository, body_bytes)
 
165
        include_missing = 'include-missing:' in revision_ids
 
166
        if include_missing:
 
167
            revision_ids.remove('include-missing:')
 
168
        body_lines = body_bytes.split('\n')
 
169
        search_result, error = self.recreate_search_from_recipe(
 
170
            repository, body_lines)
149
171
        if error is not None:
150
172
            return error
151
173
        # TODO might be nice to start up the search again; but thats not
152
174
        # written or tested yet.
153
 
        client_seen_revs = set(search.get_result().get_keys())
 
175
        client_seen_revs = set(search_result.get_keys())
154
176
        # Always include the requested ids.
155
177
        client_seen_revs.difference_update(revision_ids)
156
178
        lines = []
163
185
        while next_revs:
164
186
            queried_revs.update(next_revs)
165
187
            parent_map = repo_graph.get_parent_map(next_revs)
 
188
            current_revs = next_revs
166
189
            next_revs = set()
167
 
            for revision_id, parents in parent_map.iteritems():
168
 
                # adjust for the wire
169
 
                if parents == (_mod_revision.NULL_REVISION,):
170
 
                    parents = ()
171
 
                # prepare the next query
172
 
                next_revs.update(parents)
173
 
                if revision_id not in client_seen_revs:
 
190
            for revision_id in current_revs:
 
191
                missing_rev = False
 
192
                parents = parent_map.get(revision_id)
 
193
                if parents is not None:
 
194
                    # adjust for the wire
 
195
                    if parents == (_mod_revision.NULL_REVISION,):
 
196
                        parents = ()
 
197
                    # prepare the next query
 
198
                    next_revs.update(parents)
 
199
                    encoded_id = revision_id
 
200
                else:
 
201
                    missing_rev = True
 
202
                    encoded_id = "missing:" + revision_id
 
203
                    parents = []
 
204
                if (revision_id not in client_seen_revs and
 
205
                    (not missing_rev or include_missing)):
174
206
                    # Client does not have this revision, give it to it.
175
207
                    # add parents to the result
176
 
                    result[revision_id] = parents
 
208
                    result[encoded_id] = parents
177
209
                    # Approximate the serialized cost of this revision_id.
178
 
                    size_so_far += 2 + len(revision_id) + sum(map(len, parents))
 
210
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
179
211
            # get all the directly asked for parents, and then flesh out to
180
212
            # 64K (compressed) or so. We do one level of depth at a time to
181
213
            # stay in sync with the client. The 250000 magic number is
182
214
            # estimated compression ratio taken from bzr.dev itself.
183
 
            if first_loop_done and size_so_far > 250000:
 
215
            if self.no_extra_results or (
 
216
                first_loop_done and size_so_far > 250000):
184
217
                next_revs = set()
185
218
                break
186
219
            # don't query things we've already queried
197
230
 
198
231
 
199
232
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
200
 
    
 
233
 
201
234
    def do_readlocked_repository_request(self, repository, revision_id):
202
235
        """Return the result of repository.get_revision_graph(revision_id).
203
236
 
204
237
        Deprecated as of bzr 1.4, but supported for older clients.
205
 
        
 
238
 
206
239
        :param repository: The repository to query in.
207
240
        :param revision_id: The utf8 encoded revision_id to get a graph from.
208
241
        :return: A smart server response where the body contains an utf8
330
363
        return SuccessfulSmartServerResponse(('ok', token))
331
364
 
332
365
 
 
366
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
 
367
 
 
368
    def do_repository_request(self, repository, to_network_name):
 
369
        """Get a stream for inserting into a to_format repository.
 
370
 
 
371
        :param repository: The repository to stream from.
 
372
        :param to_network_name: The network name of the format of the target
 
373
            repository.
 
374
        """
 
375
        self._to_format = network_format_registry.get(to_network_name)
 
376
        return None # Signal that we want a body.
 
377
 
 
378
    def do_body(self, body_bytes):
 
379
        repository = self._repository
 
380
        repository.lock_read()
 
381
        try:
 
382
            search_result, error = self.recreate_search(repository, body_bytes)
 
383
            if error is not None:
 
384
                repository.unlock()
 
385
                return error
 
386
            source = repository._get_source(self._to_format)
 
387
            stream = source.get_stream(search_result)
 
388
        except Exception:
 
389
            exc_info = sys.exc_info()
 
390
            try:
 
391
                # On non-error, unlocking is done by the body stream handler.
 
392
                repository.unlock()
 
393
            finally:
 
394
                raise exc_info[0], exc_info[1], exc_info[2]
 
395
        return SuccessfulSmartServerResponse(('ok',),
 
396
            body_stream=self.body_stream(stream, repository))
 
397
 
 
398
    def body_stream(self, stream, repository):
 
399
        byte_stream = _stream_to_byte_stream(stream, repository._format)
 
400
        try:
 
401
            for bytes in byte_stream:
 
402
                yield bytes
 
403
        except errors.RevisionNotPresent, e:
 
404
            # This shouldn't be able to happen, but as we don't buffer
 
405
            # everything it can in theory happen.
 
406
            repository.unlock()
 
407
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
 
408
        else:
 
409
            repository.unlock()
 
410
 
 
411
 
 
412
def _stream_to_byte_stream(stream, src_format):
 
413
    """Convert a record stream to a self delimited byte stream."""
 
414
    pack_writer = pack.ContainerSerialiser()
 
415
    yield pack_writer.begin()
 
416
    yield pack_writer.bytes_record(src_format.network_name(), '')
 
417
    for substream_type, substream in stream:
 
418
        for record in substream:
 
419
            if record.storage_kind in ('chunked', 'fulltext'):
 
420
                serialised = record_to_fulltext_bytes(record)
 
421
            else:
 
422
                serialised = record.get_bytes_as(record.storage_kind)
 
423
            if serialised:
 
424
                # Some streams embed the whole stream into the wire
 
425
                # representation of the first record, which means that
 
426
                # later records have no wire representation: we skip them.
 
427
                yield pack_writer.bytes_record(serialised, [(substream_type,)])
 
428
    yield pack_writer.end()
 
429
 
 
430
 
 
431
def _byte_stream_to_stream(byte_stream):
 
432
    """Convert a byte stream into a format and a stream.
 
433
 
 
434
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
 
435
    :return: (RepositoryFormat, stream_generator)
 
436
    """
 
437
    stream_decoder = pack.ContainerPushParser()
 
438
    def record_stream():
 
439
        """Closure to return the substreams."""
 
440
        # May have fully parsed records already.
 
441
        for record in stream_decoder.read_pending_records():
 
442
            record_names, record_bytes = record
 
443
            record_name, = record_names
 
444
            substream_type = record_name[0]
 
445
            substream = NetworkRecordStream([record_bytes])
 
446
            yield substream_type, substream.read()
 
447
        for bytes in byte_stream:
 
448
            stream_decoder.accept_bytes(bytes)
 
449
            for record in stream_decoder.read_pending_records():
 
450
                record_names, record_bytes = record
 
451
                record_name, = record_names
 
452
                substream_type = record_name[0]
 
453
                substream = NetworkRecordStream([record_bytes])
 
454
                yield substream_type, substream.read()
 
455
    for bytes in byte_stream:
 
456
        stream_decoder.accept_bytes(bytes)
 
457
        for record in stream_decoder.read_pending_records(max=1):
 
458
            record_names, src_format_name = record
 
459
            src_format = network_format_registry.get(src_format_name)
 
460
            return src_format, record_stream()
 
461
 
 
462
 
333
463
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
334
464
 
335
465
    def do_repository_request(self, repository, token):
358
488
 
359
489
    The returned tarball contains a .bzr control directory which in turn
360
490
    contains a repository.
361
 
    
362
 
    This takes one parameter, compression, which currently must be 
 
491
 
 
492
    This takes one parameter, compression, which currently must be
363
493
    "", "gz", or "bz2".
364
494
 
365
495
    This is used to implement the Repository.copy_content_into operation.
411
541
            tarball.close()
412
542
 
413
543
 
414
 
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
415
 
 
416
 
    def do_repository_request(self, repository):
 
544
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
 
545
    """Insert a record stream from a RemoteSink into a repository.
 
546
 
 
547
    This gets bytes pushed to it by the network infrastructure and turns that
 
548
    into a bytes iterator using a thread. That is then processed by
 
549
    _byte_stream_to_stream.
 
550
 
 
551
    New in 1.14.
 
552
    """
 
553
 
 
554
    def do_repository_request(self, repository, resume_tokens, lock_token):
417
555
        """StreamSink.insert_stream for a remote repository."""
418
 
        repository.lock_write()
419
 
        repository.start_write_group()
 
556
        repository.lock_write(token=lock_token)
 
557
        self.do_insert_stream_request(repository, resume_tokens)
 
558
 
 
559
    def do_insert_stream_request(self, repository, resume_tokens):
 
560
        tokens = [token for token in resume_tokens.split(' ') if token]
 
561
        self.tokens = tokens
420
562
        self.repository = repository
421
 
        self.stream_decoder = pack.ContainerPushParser()
422
 
        self.src_format = None
423
563
        self.queue = Queue.Queue()
424
 
        self.insert_thread = None
 
564
        self.insert_thread = threading.Thread(target=self._inserter_thread)
 
565
        self.insert_thread.start()
425
566
 
426
567
    def do_chunk(self, body_stream_chunk):
427
 
        self.stream_decoder.accept_bytes(body_stream_chunk)
428
 
        for record in self.stream_decoder.read_pending_records():
429
 
            record_names, record_bytes = record
430
 
            if self.src_format is None:
431
 
                src_format_name = record_bytes
432
 
                src_format = network_format_registry.get(src_format_name)
433
 
                self.src_format = src_format
434
 
                self.insert_thread = threading.Thread(target=self._inserter_thread)
435
 
                self.insert_thread.start()
436
 
            else:
437
 
                record_name, = record_names
438
 
                substream_type = record_name[0]
439
 
                stream = NetworkRecordStream([record_bytes])
440
 
                for record in stream.read():
441
 
                    self.queue.put((substream_type, [record]))
 
568
        self.queue.put(body_stream_chunk)
442
569
 
443
570
    def _inserter_thread(self):
444
 
        self.repository._get_sink().insert_stream(self.blocking_read_stream(),
445
 
                self.src_format)
 
571
        try:
 
572
            src_format, stream = _byte_stream_to_stream(
 
573
                self.blocking_byte_stream())
 
574
            self.insert_result = self.repository._get_sink().insert_stream(
 
575
                stream, src_format, self.tokens)
 
576
            self.insert_ok = True
 
577
        except:
 
578
            self.insert_exception = sys.exc_info()
 
579
            self.insert_ok = False
446
580
 
447
 
    def blocking_read_stream(self):
 
581
    def blocking_byte_stream(self):
448
582
        while True:
449
 
            item = self.queue.get()
450
 
            if item is StopIteration:
 
583
            bytes = self.queue.get()
 
584
            if bytes is StopIteration:
451
585
                return
452
586
            else:
453
 
                yield item
 
587
                yield bytes
454
588
 
455
589
    def do_end(self):
456
590
        self.queue.put(StopIteration)
457
591
        if self.insert_thread is not None:
458
592
            self.insert_thread.join()
459
 
        self.repository.commit_write_group()
460
 
        self.repository.unlock()
461
 
        return SuccessfulSmartServerResponse(('ok', ))
 
593
        if not self.insert_ok:
 
594
            exc_info = self.insert_exception
 
595
            raise exc_info[0], exc_info[1], exc_info[2]
 
596
        write_group_tokens, missing_keys = self.insert_result
 
597
        if write_group_tokens or missing_keys:
 
598
            # bzip needed? missing keys should typically be a small set.
 
599
            # Should this be a streaming body response ?
 
600
            missing_keys = sorted(missing_keys)
 
601
            bytes = bencode.bencode((write_group_tokens, missing_keys))
 
602
            self.repository.unlock()
 
603
            return SuccessfulSmartServerResponse(('missing-basis', bytes))
 
604
        else:
 
605
            self.repository.unlock()
 
606
            return SuccessfulSmartServerResponse(('ok', ))
 
607
 
 
608
 
 
609
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
 
610
    """Insert a record stream from a RemoteSink into an unlocked repository.
 
611
 
 
612
    This is the same as SmartServerRepositoryInsertStreamLocked, except it
 
613
    takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
 
614
    like pack format) repository.
 
615
 
 
616
    New in 1.13.
 
617
    """
 
618
 
 
619
    def do_repository_request(self, repository, resume_tokens):
 
620
        """StreamSink.insert_stream for a remote repository."""
 
621
        repository.lock_write()
 
622
        self.do_insert_stream_request(repository, resume_tokens)
462
623
 
463
624