~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Robert Collins
  • Date: 2007-09-25 08:41:29 UTC
  • mto: This revision was merged to the branch mainline in revision 2862.
  • Revision ID: robertc@robertcollins.net-20070925084129-ca0kd25h23dmunrs
Review feedback.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Server-side repository related request implmentations."""
18
18
 
19
 
import bz2
20
19
import os
21
 
import Queue
22
 
import struct
23
20
import sys
 
21
import tempfile
24
22
import tarfile
25
 
import tempfile
26
 
import threading
27
23
 
28
 
from bzrlib import (
29
 
    errors,
30
 
    graph,
31
 
    osutils,
32
 
    pack,
33
 
    )
 
24
from bzrlib import errors
34
25
from bzrlib.bzrdir import BzrDir
35
26
from bzrlib.smart.request import (
36
27
    FailedSmartServerResponse,
37
28
    SmartServerRequest,
38
29
    SuccessfulSmartServerResponse,
39
30
    )
40
 
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
41
 
from bzrlib import revision as _mod_revision
42
 
from bzrlib.util import bencode
43
 
from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
44
31
 
45
32
 
46
33
class SmartServerRepositoryRequest(SmartServerRequest):
48
35
 
49
36
    def do(self, path, *args):
50
37
        """Execute a repository request.
51
 
 
52
 
        All Repository requests take a path to the repository as their first
53
 
        argument.  The repository must be at the exact path given by the
54
 
        client - no searching is done.
 
38
        
 
39
        The repository must be at the exact path - no searching is done.
55
40
 
56
41
        The actual logic is delegated to self.do_repository_request.
57
42
 
58
 
        :param client_path: The path for the repository as received from the
59
 
            client.
60
 
        :return: A SmartServerResponse from self.do_repository_request().
 
43
        :param path: The path for the repository.
 
44
        :return: A smart server from self.do_repository_request().
61
45
        """
62
 
        transport = self.transport_from_client_path(path)
 
46
        transport = self._backing_transport.clone(path)
63
47
        bzrdir = BzrDir.open_from_transport(transport)
64
 
        # Save the repository for use with do_body.
65
 
        self._repository = bzrdir.open_repository()
66
 
        return self.do_repository_request(self._repository, *args)
67
 
 
68
 
    def do_repository_request(self, repository, *args):
69
 
        """Override to provide an implementation for a verb."""
70
 
        # No-op for verbs that take bodies (None as a result indicates a body
71
 
        # is expected)
72
 
        return None
73
 
 
74
 
    def recreate_search(self, repository, search_bytes):
75
 
        lines = search_bytes.split('\n')
76
 
        if lines[0] == 'ancestry-of':
77
 
            heads = lines[1:]
78
 
            search_result = graph.PendingAncestryResult(heads, repository)
79
 
            return search_result, None
80
 
        elif lines[0] == 'search':
81
 
            return self.recreate_search_from_recipe(repository, lines[1:])
82
 
        else:
83
 
            return (None, FailedSmartServerResponse(('BadSearch',)))
84
 
 
85
 
    def recreate_search_from_recipe(self, repository, lines):
86
 
        start_keys = set(lines[0].split(' '))
87
 
        exclude_keys = set(lines[1].split(' '))
88
 
        revision_count = int(lines[2])
89
 
        repository.lock_read()
90
 
        try:
91
 
            search = repository.get_graph()._make_breadth_first_searcher(
92
 
                start_keys)
93
 
            while True:
94
 
                try:
95
 
                    next_revs = search.next()
96
 
                except StopIteration:
97
 
                    break
98
 
                search.stop_searching_any(exclude_keys.intersection(next_revs))
99
 
            search_result = search.get_result()
100
 
            if search_result.get_recipe()[3] != revision_count:
101
 
                # we got back a different amount of data than expected, this
102
 
                # gets reported as NoSuchRevision, because less revisions
103
 
                # indicates missing revisions, and more should never happen as
104
 
                # the excludes list considers ghosts and ensures that ghost
105
 
                # filling races are not a problem.
106
 
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
107
 
            return (search_result, None)
108
 
        finally:
109
 
            repository.unlock()
110
 
 
111
 
 
112
 
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
113
 
    """Calls self.do_readlocked_repository_request."""
114
 
 
115
 
    def do_repository_request(self, repository, *args):
116
 
        """Read lock a repository for do_readlocked_repository_request."""
117
 
        repository.lock_read()
118
 
        try:
119
 
            return self.do_readlocked_repository_request(repository, *args)
120
 
        finally:
121
 
            repository.unlock()
122
 
 
123
 
 
124
 
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
125
 
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
126
 
 
127
 
    no_extra_results = False
128
 
 
129
 
    def do_repository_request(self, repository, *revision_ids):
130
 
        """Get parent details for some revisions.
131
 
 
132
 
        All the parents for revision_ids are returned. Additionally up to 64KB
133
 
        of additional parent data found by performing a breadth first search
134
 
        from revision_ids is returned. The verb takes a body containing the
135
 
        current search state, see do_body for details.
136
 
 
137
 
        If 'include-missing:' is in revision_ids, ghosts encountered in the
138
 
        graph traversal for getting parent data are included in the result with
139
 
        a prefix of 'missing:'.
140
 
 
141
 
        :param repository: The repository to query in.
142
 
        :param revision_ids: The utf8 encoded revision_id to answer for.
143
 
        """
144
 
        self._revision_ids = revision_ids
145
 
        return None # Signal that we want a body.
146
 
 
147
 
    def do_body(self, body_bytes):
148
 
        """Process the current search state and perform the parent lookup.
149
 
 
150
 
        :return: A smart server response where the body contains an utf8
151
 
            encoded flattened list of the parents of the revisions (the same
152
 
            format as Repository.get_revision_graph) which has been bz2
153
 
            compressed.
154
 
        """
155
 
        repository = self._repository
156
 
        repository.lock_read()
157
 
        try:
158
 
            return self._do_repository_request(body_bytes)
159
 
        finally:
160
 
            repository.unlock()
161
 
 
162
 
    def _do_repository_request(self, body_bytes):
163
 
        repository = self._repository
164
 
        revision_ids = set(self._revision_ids)
165
 
        include_missing = 'include-missing:' in revision_ids
166
 
        if include_missing:
167
 
            revision_ids.remove('include-missing:')
168
 
        body_lines = body_bytes.split('\n')
169
 
        search_result, error = self.recreate_search_from_recipe(
170
 
            repository, body_lines)
171
 
        if error is not None:
172
 
            return error
173
 
        # TODO might be nice to start up the search again; but thats not
174
 
        # written or tested yet.
175
 
        client_seen_revs = set(search_result.get_keys())
176
 
        # Always include the requested ids.
177
 
        client_seen_revs.difference_update(revision_ids)
178
 
        lines = []
179
 
        repo_graph = repository.get_graph()
180
 
        result = {}
181
 
        queried_revs = set()
182
 
        size_so_far = 0
183
 
        next_revs = revision_ids
184
 
        first_loop_done = False
185
 
        while next_revs:
186
 
            queried_revs.update(next_revs)
187
 
            parent_map = repo_graph.get_parent_map(next_revs)
188
 
            current_revs = next_revs
189
 
            next_revs = set()
190
 
            for revision_id in current_revs:
191
 
                missing_rev = False
192
 
                parents = parent_map.get(revision_id)
193
 
                if parents is not None:
194
 
                    # adjust for the wire
195
 
                    if parents == (_mod_revision.NULL_REVISION,):
196
 
                        parents = ()
197
 
                    # prepare the next query
198
 
                    next_revs.update(parents)
199
 
                    encoded_id = revision_id
200
 
                else:
201
 
                    missing_rev = True
202
 
                    encoded_id = "missing:" + revision_id
203
 
                    parents = []
204
 
                if (revision_id not in client_seen_revs and
205
 
                    (not missing_rev or include_missing)):
206
 
                    # Client does not have this revision, give it to it.
207
 
                    # add parents to the result
208
 
                    result[encoded_id] = parents
209
 
                    # Approximate the serialized cost of this revision_id.
210
 
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
211
 
            # get all the directly asked for parents, and then flesh out to
212
 
            # 64K (compressed) or so. We do one level of depth at a time to
213
 
            # stay in sync with the client. The 250000 magic number is
214
 
            # estimated compression ratio taken from bzr.dev itself.
215
 
            if self.no_extra_results or (
216
 
                first_loop_done and size_so_far > 250000):
217
 
                next_revs = set()
218
 
                break
219
 
            # don't query things we've already queried
220
 
            next_revs.difference_update(queried_revs)
221
 
            first_loop_done = True
222
 
 
223
 
        # sorting trivially puts lexographically similar revision ids together.
224
 
        # Compression FTW.
225
 
        for revision, parents in sorted(result.items()):
226
 
            lines.append(' '.join((revision, ) + tuple(parents)))
227
 
 
228
 
        return SuccessfulSmartServerResponse(
229
 
            ('ok', ), bz2.compress('\n'.join(lines)))
230
 
 
231
 
 
232
 
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
233
 
 
234
 
    def do_readlocked_repository_request(self, repository, revision_id):
 
48
        repository = bzrdir.open_repository()
 
49
        return self.do_repository_request(repository, *args)
 
50
 
 
51
 
 
52
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryRequest):
 
53
    
 
54
    def do_repository_request(self, repository, revision_id):
235
55
        """Return the result of repository.get_revision_graph(revision_id).
236
 
 
237
 
        Deprecated as of bzr 1.4, but supported for older clients.
238
 
 
 
56
        
239
57
        :param repository: The repository to query in.
240
58
        :param revision_id: The utf8 encoded revision_id to get a graph from.
241
59
        :return: A smart server response where the body contains an utf8
245
63
            revision_id = None
246
64
 
247
65
        lines = []
248
 
        graph = repository.get_graph()
249
 
        if revision_id:
250
 
            search_ids = [revision_id]
251
 
        else:
252
 
            search_ids = repository.all_revision_ids()
253
 
        search = graph._make_breadth_first_searcher(search_ids)
254
 
        transitive_ids = set()
255
 
        map(transitive_ids.update, list(search))
256
 
        parent_map = graph.get_parent_map(transitive_ids)
257
 
        revision_graph = _strip_NULL_ghosts(parent_map)
258
 
        if revision_id and revision_id not in revision_graph:
 
66
        try:
 
67
            revision_graph = repository.get_revision_graph(revision_id)
 
68
        except errors.NoSuchRevision:
259
69
            # Note that we return an empty body, rather than omitting the body.
260
70
            # This way the client knows that it can always expect to find a body
261
71
            # in the response for this method, even in the error case.
297
107
              firstrev: 1234.230 0
298
108
              latestrev: 345.700 3600
299
109
              revisions: 2
 
110
              size:45
300
111
 
301
112
              But containing only fields returned by the gather_stats() call
302
113
        """
352
163
            return FailedSmartServerResponse(('LockContention',))
353
164
        except errors.UnlockableTransport:
354
165
            return FailedSmartServerResponse(('UnlockableTransport',))
355
 
        except errors.LockFailed, e:
356
 
            return FailedSmartServerResponse(('LockFailed',
357
 
                str(e.lock), str(e.why)))
358
 
        if token is not None:
359
 
            repository.leave_lock_in_place()
 
166
        repository.leave_lock_in_place()
360
167
        repository.unlock()
361
168
        if token is None:
362
169
            token = ''
363
170
        return SuccessfulSmartServerResponse(('ok', token))
364
171
 
365
172
 
366
 
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
367
 
 
368
 
    def do_repository_request(self, repository, to_network_name):
369
 
        """Get a stream for inserting into a to_format repository.
370
 
 
371
 
        :param repository: The repository to stream from.
372
 
        :param to_network_name: The network name of the format of the target
373
 
            repository.
374
 
        """
375
 
        self._to_format = network_format_registry.get(to_network_name)
376
 
        return None # Signal that we want a body.
377
 
 
378
 
    def do_body(self, body_bytes):
379
 
        repository = self._repository
380
 
        repository.lock_read()
381
 
        try:
382
 
            search_result, error = self.recreate_search(repository, body_bytes)
383
 
            if error is not None:
384
 
                repository.unlock()
385
 
                return error
386
 
            source = repository._get_source(self._to_format)
387
 
            stream = source.get_stream(search_result)
388
 
        except Exception:
389
 
            exc_info = sys.exc_info()
390
 
            try:
391
 
                # On non-error, unlocking is done by the body stream handler.
392
 
                repository.unlock()
393
 
            finally:
394
 
                raise exc_info[0], exc_info[1], exc_info[2]
395
 
        return SuccessfulSmartServerResponse(('ok',),
396
 
            body_stream=self.body_stream(stream, repository))
397
 
 
398
 
    def body_stream(self, stream, repository):
399
 
        byte_stream = _stream_to_byte_stream(stream, repository._format)
400
 
        try:
401
 
            for bytes in byte_stream:
402
 
                yield bytes
403
 
        except errors.RevisionNotPresent, e:
404
 
            # This shouldn't be able to happen, but as we don't buffer
405
 
            # everything it can in theory happen.
406
 
            repository.unlock()
407
 
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
408
 
        else:
409
 
            repository.unlock()
410
 
 
411
 
 
412
 
def _stream_to_byte_stream(stream, src_format):
413
 
    """Convert a record stream to a self delimited byte stream."""
414
 
    pack_writer = pack.ContainerSerialiser()
415
 
    yield pack_writer.begin()
416
 
    yield pack_writer.bytes_record(src_format.network_name(), '')
417
 
    for substream_type, substream in stream:
418
 
        for record in substream:
419
 
            if record.storage_kind in ('chunked', 'fulltext'):
420
 
                serialised = record_to_fulltext_bytes(record)
421
 
            else:
422
 
                serialised = record.get_bytes_as(record.storage_kind)
423
 
            if serialised:
424
 
                # Some streams embed the whole stream into the wire
425
 
                # representation of the first record, which means that
426
 
                # later records have no wire representation: we skip them.
427
 
                yield pack_writer.bytes_record(serialised, [(substream_type,)])
428
 
    yield pack_writer.end()
429
 
 
430
 
 
431
 
def _byte_stream_to_stream(byte_stream):
432
 
    """Convert a byte stream into a format and a stream.
433
 
 
434
 
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
435
 
    :return: (RepositoryFormat, stream_generator)
436
 
    """
437
 
    stream_decoder = pack.ContainerPushParser()
438
 
    def record_stream():
439
 
        """Closure to return the substreams."""
440
 
        # May have fully parsed records already.
441
 
        for record in stream_decoder.read_pending_records():
442
 
            record_names, record_bytes = record
443
 
            record_name, = record_names
444
 
            substream_type = record_name[0]
445
 
            substream = NetworkRecordStream([record_bytes])
446
 
            yield substream_type, substream.read()
447
 
        for bytes in byte_stream:
448
 
            stream_decoder.accept_bytes(bytes)
449
 
            for record in stream_decoder.read_pending_records():
450
 
                record_names, record_bytes = record
451
 
                record_name, = record_names
452
 
                substream_type = record_name[0]
453
 
                substream = NetworkRecordStream([record_bytes])
454
 
                yield substream_type, substream.read()
455
 
    for bytes in byte_stream:
456
 
        stream_decoder.accept_bytes(bytes)
457
 
        for record in stream_decoder.read_pending_records(max=1):
458
 
            record_names, src_format_name = record
459
 
            src_format = network_format_registry.get(src_format_name)
460
 
            return src_format, record_stream()
461
 
 
462
 
 
463
173
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
464
174
 
465
175
    def do_repository_request(self, repository, token):
472
182
        return SuccessfulSmartServerResponse(('ok',))
473
183
 
474
184
 
475
 
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
476
 
 
477
 
    def do_repository_request(self, repository, str_bool_new_value):
478
 
        if str_bool_new_value == 'True':
479
 
            new_value = True
480
 
        else:
481
 
            new_value = False
482
 
        repository.set_make_working_trees(new_value)
483
 
        return SuccessfulSmartServerResponse(('ok',))
484
 
 
485
 
 
486
185
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
487
186
    """Get the raw repository files as a tarball.
488
187
 
489
188
    The returned tarball contains a .bzr control directory which in turn
490
189
    contains a repository.
491
 
 
492
 
    This takes one parameter, compression, which currently must be
 
190
    
 
191
    This takes one parameter, compression, which currently must be 
493
192
    "", "gz", or "bz2".
494
193
 
495
194
    This is used to implement the Repository.copy_content_into operation.
496
195
    """
497
196
 
498
197
    def do_repository_request(self, repository, compression):
 
198
        from bzrlib import osutils
 
199
        repo_transport = repository.control_files._transport
499
200
        tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
500
201
        try:
501
202
            controldir_name = tmp_dirname + '/.bzr'
504
205
            osutils.rmtree(tmp_dirname)
505
206
 
506
207
    def _copy_to_tempdir(self, from_repo):
507
 
        tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
 
208
        tmp_dirname = tempfile.mkdtemp(prefix='tmpbzrclone')
508
209
        tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
509
210
        tmp_repo = from_repo._format.initialize(tmp_bzrdir)
510
211
        from_repo.copy_content_into(tmp_repo)
517
218
            # all finished; write the tempfile out to the network
518
219
            temp.seek(0)
519
220
            return SuccessfulSmartServerResponse(('ok',), temp.read())
520
 
            # FIXME: Don't read the whole thing into memory here; rather stream
521
 
            # it out from the file onto the network. mbp 20070411
 
221
            # FIXME: Don't read the whole thing into memory here; rather stream it
 
222
            # out from the file onto the network. mbp 20070411
522
223
        finally:
523
224
            temp.close()
524
225
 
534
235
            dirname = dirname.encode(sys.getfilesystemencoding())
535
236
            # python's tarball module includes the whole path by default so
536
237
            # override it
537
 
            if not dirname.endswith('.bzr'):
538
 
                raise ValueError(dirname)
 
238
            assert dirname.endswith('.bzr')
539
239
            tarball.add(dirname, '.bzr') # recursive by default
540
240
        finally:
541
241
            tarball.close()
542
 
 
543
 
 
544
 
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
545
 
    """Insert a record stream from a RemoteSink into a repository.
546
 
 
547
 
    This gets bytes pushed to it by the network infrastructure and turns that
548
 
    into a bytes iterator using a thread. That is then processed by
549
 
    _byte_stream_to_stream.
550
 
 
551
 
    New in 1.14.
552
 
    """
553
 
 
554
 
    def do_repository_request(self, repository, resume_tokens, lock_token):
555
 
        """StreamSink.insert_stream for a remote repository."""
556
 
        repository.lock_write(token=lock_token)
557
 
        self.do_insert_stream_request(repository, resume_tokens)
558
 
 
559
 
    def do_insert_stream_request(self, repository, resume_tokens):
560
 
        tokens = [token for token in resume_tokens.split(' ') if token]
561
 
        self.tokens = tokens
562
 
        self.repository = repository
563
 
        self.queue = Queue.Queue()
564
 
        self.insert_thread = threading.Thread(target=self._inserter_thread)
565
 
        self.insert_thread.start()
566
 
 
567
 
    def do_chunk(self, body_stream_chunk):
568
 
        self.queue.put(body_stream_chunk)
569
 
 
570
 
    def _inserter_thread(self):
571
 
        try:
572
 
            src_format, stream = _byte_stream_to_stream(
573
 
                self.blocking_byte_stream())
574
 
            self.insert_result = self.repository._get_sink().insert_stream(
575
 
                stream, src_format, self.tokens)
576
 
            self.insert_ok = True
577
 
        except:
578
 
            self.insert_exception = sys.exc_info()
579
 
            self.insert_ok = False
580
 
 
581
 
    def blocking_byte_stream(self):
582
 
        while True:
583
 
            bytes = self.queue.get()
584
 
            if bytes is StopIteration:
585
 
                return
586
 
            else:
587
 
                yield bytes
588
 
 
589
 
    def do_end(self):
590
 
        self.queue.put(StopIteration)
591
 
        if self.insert_thread is not None:
592
 
            self.insert_thread.join()
593
 
        if not self.insert_ok:
594
 
            exc_info = self.insert_exception
595
 
            raise exc_info[0], exc_info[1], exc_info[2]
596
 
        write_group_tokens, missing_keys = self.insert_result
597
 
        if write_group_tokens or missing_keys:
598
 
            # bzip needed? missing keys should typically be a small set.
599
 
            # Should this be a streaming body response ?
600
 
            missing_keys = sorted(missing_keys)
601
 
            bytes = bencode.bencode((write_group_tokens, missing_keys))
602
 
            self.repository.unlock()
603
 
            return SuccessfulSmartServerResponse(('missing-basis', bytes))
604
 
        else:
605
 
            self.repository.unlock()
606
 
            return SuccessfulSmartServerResponse(('ok', ))
607
 
 
608
 
 
609
 
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
610
 
    """Insert a record stream from a RemoteSink into an unlocked repository.
611
 
 
612
 
    This is the same as SmartServerRepositoryInsertStreamLocked, except it
613
 
    takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
614
 
    like pack format) repository.
615
 
 
616
 
    New in 1.13.
617
 
    """
618
 
 
619
 
    def do_repository_request(self, repository, resume_tokens):
620
 
        """StreamSink.insert_stream for a remote repository."""
621
 
        repository.lock_write()
622
 
        self.do_insert_stream_request(repository, resume_tokens)
623
 
 
624