~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Server-side repository related request implmentations."""
18
18
 
 
19
import bz2
19
20
import os
 
21
import Queue
 
22
import struct
20
23
import sys
 
24
import tarfile
21
25
import tempfile
22
 
import tarfile
 
26
import threading
23
27
 
24
 
from bzrlib import errors
 
28
from bzrlib import (
 
29
    errors,
 
30
    graph,
 
31
    osutils,
 
32
    pack,
 
33
    )
25
34
from bzrlib.bzrdir import BzrDir
26
35
from bzrlib.smart.request import (
27
36
    FailedSmartServerResponse,
28
37
    SmartServerRequest,
29
38
    SuccessfulSmartServerResponse,
30
39
    )
 
40
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
 
41
from bzrlib import revision as _mod_revision
 
42
from bzrlib.util import bencode
 
43
from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
31
44
 
32
45
 
33
46
class SmartServerRepositoryRequest(SmartServerRequest):
35
48
 
36
49
    def do(self, path, *args):
37
50
        """Execute a repository request.
38
 
        
39
 
        The repository must be at the exact path - no searching is done.
 
51
 
 
52
        All Repository requests take a path to the repository as their first
 
53
        argument.  The repository must be at the exact path given by the
 
54
        client - no searching is done.
40
55
 
41
56
        The actual logic is delegated to self.do_repository_request.
42
57
 
43
 
        :param path: The path for the repository.
44
 
        :return: A smart server from self.do_repository_request().
 
58
        :param client_path: The path for the repository as received from the
 
59
            client.
 
60
        :return: A SmartServerResponse from self.do_repository_request().
45
61
        """
46
 
        transport = self._backing_transport.clone(path)
 
62
        transport = self.transport_from_client_path(path)
47
63
        bzrdir = BzrDir.open_from_transport(transport)
48
 
        repository = bzrdir.open_repository()
49
 
        return self.do_repository_request(repository, *args)
50
 
 
51
 
 
52
 
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryRequest):
53
 
    
54
 
    def do_repository_request(self, repository, revision_id):
 
64
        # Save the repository for use with do_body.
 
65
        self._repository = bzrdir.open_repository()
 
66
        return self.do_repository_request(self._repository, *args)
 
67
 
 
68
    def do_repository_request(self, repository, *args):
 
69
        """Override to provide an implementation for a verb."""
 
70
        # No-op for verbs that take bodies (None as a result indicates a body
 
71
        # is expected)
 
72
        return None
 
73
 
 
74
    def recreate_search(self, repository, search_bytes, discard_excess=False):
 
75
        """Recreate a search from its serialised form.
 
76
 
 
77
        :param discard_excess: If True, and the search refers to data we don't
 
78
            have, just silently accept that fact - the verb calling
 
79
            recreate_search trusts that clients will look for missing things
 
80
            they expected and get it from elsewhere.
 
81
        """
 
82
        lines = search_bytes.split('\n')
 
83
        if lines[0] == 'ancestry-of':
 
84
            heads = lines[1:]
 
85
            search_result = graph.PendingAncestryResult(heads, repository)
 
86
            return search_result, None
 
87
        elif lines[0] == 'search':
 
88
            return self.recreate_search_from_recipe(repository, lines[1:],
 
89
                discard_excess=discard_excess)
 
90
        else:
 
91
            return (None, FailedSmartServerResponse(('BadSearch',)))
 
92
 
 
93
    def recreate_search_from_recipe(self, repository, lines,
 
94
        discard_excess=False):
 
95
        """Recreate a specific revision search (vs a from-tip search).
 
96
 
 
97
        :param discard_excess: If True, and the search refers to data we don't
 
98
            have, just silently accept that fact - the verb calling
 
99
            recreate_search trusts that clients will look for missing things
 
100
            they expected and get it from elsewhere.
 
101
        """
 
102
        start_keys = set(lines[0].split(' '))
 
103
        exclude_keys = set(lines[1].split(' '))
 
104
        revision_count = int(lines[2])
 
105
        repository.lock_read()
 
106
        try:
 
107
            search = repository.get_graph()._make_breadth_first_searcher(
 
108
                start_keys)
 
109
            while True:
 
110
                try:
 
111
                    next_revs = search.next()
 
112
                except StopIteration:
 
113
                    break
 
114
                search.stop_searching_any(exclude_keys.intersection(next_revs))
 
115
            search_result = search.get_result()
 
116
            if (not discard_excess and
 
117
                search_result.get_recipe()[3] != revision_count):
 
118
                # we got back a different amount of data than expected, this
 
119
                # gets reported as NoSuchRevision, because less revisions
 
120
                # indicates missing revisions, and more should never happen as
 
121
                # the excludes list considers ghosts and ensures that ghost
 
122
                # filling races are not a problem.
 
123
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
 
124
            return (search_result, None)
 
125
        finally:
 
126
            repository.unlock()
 
127
 
 
128
 
 
129
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
 
130
    """Calls self.do_readlocked_repository_request."""
 
131
 
 
132
    def do_repository_request(self, repository, *args):
 
133
        """Read lock a repository for do_readlocked_repository_request."""
 
134
        repository.lock_read()
 
135
        try:
 
136
            return self.do_readlocked_repository_request(repository, *args)
 
137
        finally:
 
138
            repository.unlock()
 
139
 
 
140
 
 
141
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
 
142
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
 
143
 
 
144
    no_extra_results = False
 
145
 
 
146
    def do_repository_request(self, repository, *revision_ids):
 
147
        """Get parent details for some revisions.
 
148
 
 
149
        All the parents for revision_ids are returned. Additionally up to 64KB
 
150
        of additional parent data found by performing a breadth first search
 
151
        from revision_ids is returned. The verb takes a body containing the
 
152
        current search state, see do_body for details.
 
153
 
 
154
        If 'include-missing:' is in revision_ids, ghosts encountered in the
 
155
        graph traversal for getting parent data are included in the result with
 
156
        a prefix of 'missing:'.
 
157
 
 
158
        :param repository: The repository to query in.
 
159
        :param revision_ids: The utf8 encoded revision_id to answer for.
 
160
        """
 
161
        self._revision_ids = revision_ids
 
162
        return None # Signal that we want a body.
 
163
 
 
164
    def do_body(self, body_bytes):
 
165
        """Process the current search state and perform the parent lookup.
 
166
 
 
167
        :return: A smart server response where the body contains an utf8
 
168
            encoded flattened list of the parents of the revisions (the same
 
169
            format as Repository.get_revision_graph) which has been bz2
 
170
            compressed.
 
171
        """
 
172
        repository = self._repository
 
173
        repository.lock_read()
 
174
        try:
 
175
            return self._do_repository_request(body_bytes)
 
176
        finally:
 
177
            repository.unlock()
 
178
 
 
179
    def _do_repository_request(self, body_bytes):
 
180
        repository = self._repository
 
181
        revision_ids = set(self._revision_ids)
 
182
        include_missing = 'include-missing:' in revision_ids
 
183
        if include_missing:
 
184
            revision_ids.remove('include-missing:')
 
185
        body_lines = body_bytes.split('\n')
 
186
        search_result, error = self.recreate_search_from_recipe(
 
187
            repository, body_lines)
 
188
        if error is not None:
 
189
            return error
 
190
        # TODO might be nice to start up the search again; but thats not
 
191
        # written or tested yet.
 
192
        client_seen_revs = set(search_result.get_keys())
 
193
        # Always include the requested ids.
 
194
        client_seen_revs.difference_update(revision_ids)
 
195
        lines = []
 
196
        repo_graph = repository.get_graph()
 
197
        result = {}
 
198
        queried_revs = set()
 
199
        size_so_far = 0
 
200
        next_revs = revision_ids
 
201
        first_loop_done = False
 
202
        while next_revs:
 
203
            queried_revs.update(next_revs)
 
204
            parent_map = repo_graph.get_parent_map(next_revs)
 
205
            current_revs = next_revs
 
206
            next_revs = set()
 
207
            for revision_id in current_revs:
 
208
                missing_rev = False
 
209
                parents = parent_map.get(revision_id)
 
210
                if parents is not None:
 
211
                    # adjust for the wire
 
212
                    if parents == (_mod_revision.NULL_REVISION,):
 
213
                        parents = ()
 
214
                    # prepare the next query
 
215
                    next_revs.update(parents)
 
216
                    encoded_id = revision_id
 
217
                else:
 
218
                    missing_rev = True
 
219
                    encoded_id = "missing:" + revision_id
 
220
                    parents = []
 
221
                if (revision_id not in client_seen_revs and
 
222
                    (not missing_rev or include_missing)):
 
223
                    # Client does not have this revision, give it to it.
 
224
                    # add parents to the result
 
225
                    result[encoded_id] = parents
 
226
                    # Approximate the serialized cost of this revision_id.
 
227
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
 
228
            # get all the directly asked for parents, and then flesh out to
 
229
            # 64K (compressed) or so. We do one level of depth at a time to
 
230
            # stay in sync with the client. The 250000 magic number is
 
231
            # estimated compression ratio taken from bzr.dev itself.
 
232
            if self.no_extra_results or (
 
233
                first_loop_done and size_so_far > 250000):
 
234
                next_revs = set()
 
235
                break
 
236
            # don't query things we've already queried
 
237
            next_revs.difference_update(queried_revs)
 
238
            first_loop_done = True
 
239
 
 
240
        # sorting trivially puts lexographically similar revision ids together.
 
241
        # Compression FTW.
 
242
        for revision, parents in sorted(result.items()):
 
243
            lines.append(' '.join((revision, ) + tuple(parents)))
 
244
 
 
245
        return SuccessfulSmartServerResponse(
 
246
            ('ok', ), bz2.compress('\n'.join(lines)))
 
247
 
 
248
 
 
249
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
 
250
 
 
251
    def do_readlocked_repository_request(self, repository, revision_id):
55
252
        """Return the result of repository.get_revision_graph(revision_id).
56
 
        
 
253
 
 
254
        Deprecated as of bzr 1.4, but supported for older clients.
 
255
 
57
256
        :param repository: The repository to query in.
58
257
        :param revision_id: The utf8 encoded revision_id to get a graph from.
59
258
        :return: A smart server response where the body contains an utf8
63
262
            revision_id = None
64
263
 
65
264
        lines = []
66
 
        try:
67
 
            revision_graph = repository.get_revision_graph(revision_id)
68
 
        except errors.NoSuchRevision:
 
265
        graph = repository.get_graph()
 
266
        if revision_id:
 
267
            search_ids = [revision_id]
 
268
        else:
 
269
            search_ids = repository.all_revision_ids()
 
270
        search = graph._make_breadth_first_searcher(search_ids)
 
271
        transitive_ids = set()
 
272
        map(transitive_ids.update, list(search))
 
273
        parent_map = graph.get_parent_map(transitive_ids)
 
274
        revision_graph = _strip_NULL_ghosts(parent_map)
 
275
        if revision_id and revision_id not in revision_graph:
69
276
            # Note that we return an empty body, rather than omitting the body.
70
277
            # This way the client knows that it can always expect to find a body
71
278
            # in the response for this method, even in the error case.
72
279
            return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
73
280
 
74
281
        for revision, parents in revision_graph.items():
75
 
            lines.append(' '.join([revision,] + parents))
 
282
            lines.append(' '.join((revision, ) + tuple(parents)))
76
283
 
77
284
        return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
78
285
 
107
314
              firstrev: 1234.230 0
108
315
              latestrev: 345.700 3600
109
316
              revisions: 2
110
 
              size:45
111
317
 
112
318
              But containing only fields returned by the gather_stats() call
113
319
        """
163
369
            return FailedSmartServerResponse(('LockContention',))
164
370
        except errors.UnlockableTransport:
165
371
            return FailedSmartServerResponse(('UnlockableTransport',))
166
 
        repository.leave_lock_in_place()
 
372
        except errors.LockFailed, e:
 
373
            return FailedSmartServerResponse(('LockFailed',
 
374
                str(e.lock), str(e.why)))
 
375
        if token is not None:
 
376
            repository.leave_lock_in_place()
167
377
        repository.unlock()
168
378
        if token is None:
169
379
            token = ''
170
380
        return SuccessfulSmartServerResponse(('ok', token))
171
381
 
172
382
 
 
383
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
 
384
 
 
385
    def do_repository_request(self, repository, to_network_name):
 
386
        """Get a stream for inserting into a to_format repository.
 
387
 
 
388
        :param repository: The repository to stream from.
 
389
        :param to_network_name: The network name of the format of the target
 
390
            repository.
 
391
        """
 
392
        self._to_format = network_format_registry.get(to_network_name)
 
393
        return None # Signal that we want a body.
 
394
 
 
395
    def do_body(self, body_bytes):
 
396
        repository = self._repository
 
397
        repository.lock_read()
 
398
        try:
 
399
            search_result, error = self.recreate_search(repository, body_bytes,
 
400
                discard_excess=True)
 
401
            if error is not None:
 
402
                repository.unlock()
 
403
                return error
 
404
            source = repository._get_source(self._to_format)
 
405
            stream = source.get_stream(search_result)
 
406
        except Exception:
 
407
            exc_info = sys.exc_info()
 
408
            try:
 
409
                # On non-error, unlocking is done by the body stream handler.
 
410
                repository.unlock()
 
411
            finally:
 
412
                raise exc_info[0], exc_info[1], exc_info[2]
 
413
        return SuccessfulSmartServerResponse(('ok',),
 
414
            body_stream=self.body_stream(stream, repository))
 
415
 
 
416
    def body_stream(self, stream, repository):
 
417
        byte_stream = _stream_to_byte_stream(stream, repository._format)
 
418
        try:
 
419
            for bytes in byte_stream:
 
420
                yield bytes
 
421
        except errors.RevisionNotPresent, e:
 
422
            # This shouldn't be able to happen, but as we don't buffer
 
423
            # everything it can in theory happen.
 
424
            repository.unlock()
 
425
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
 
426
        else:
 
427
            repository.unlock()
 
428
 
 
429
 
 
430
def _stream_to_byte_stream(stream, src_format):
 
431
    """Convert a record stream to a self delimited byte stream."""
 
432
    pack_writer = pack.ContainerSerialiser()
 
433
    yield pack_writer.begin()
 
434
    yield pack_writer.bytes_record(src_format.network_name(), '')
 
435
    for substream_type, substream in stream:
 
436
        for record in substream:
 
437
            if record.storage_kind in ('chunked', 'fulltext'):
 
438
                serialised = record_to_fulltext_bytes(record)
 
439
            else:
 
440
                serialised = record.get_bytes_as(record.storage_kind)
 
441
            if serialised:
 
442
                # Some streams embed the whole stream into the wire
 
443
                # representation of the first record, which means that
 
444
                # later records have no wire representation: we skip them.
 
445
                yield pack_writer.bytes_record(serialised, [(substream_type,)])
 
446
    yield pack_writer.end()
 
447
 
 
448
 
 
449
def _byte_stream_to_stream(byte_stream):
 
450
    """Convert a byte stream into a format and a stream.
 
451
 
 
452
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
 
453
    :return: (RepositoryFormat, stream_generator)
 
454
    """
 
455
    stream_decoder = pack.ContainerPushParser()
 
456
    def record_stream():
 
457
        """Closure to return the substreams."""
 
458
        # May have fully parsed records already.
 
459
        for record in stream_decoder.read_pending_records():
 
460
            record_names, record_bytes = record
 
461
            record_name, = record_names
 
462
            substream_type = record_name[0]
 
463
            substream = NetworkRecordStream([record_bytes])
 
464
            yield substream_type, substream.read()
 
465
        for bytes in byte_stream:
 
466
            stream_decoder.accept_bytes(bytes)
 
467
            for record in stream_decoder.read_pending_records():
 
468
                record_names, record_bytes = record
 
469
                record_name, = record_names
 
470
                substream_type = record_name[0]
 
471
                substream = NetworkRecordStream([record_bytes])
 
472
                yield substream_type, substream.read()
 
473
    for bytes in byte_stream:
 
474
        stream_decoder.accept_bytes(bytes)
 
475
        for record in stream_decoder.read_pending_records(max=1):
 
476
            record_names, src_format_name = record
 
477
            src_format = network_format_registry.get(src_format_name)
 
478
            return src_format, record_stream()
 
479
 
 
480
 
173
481
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
174
482
 
175
483
    def do_repository_request(self, repository, token):
182
490
        return SuccessfulSmartServerResponse(('ok',))
183
491
 
184
492
 
 
493
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
 
494
 
 
495
    def do_repository_request(self, repository, str_bool_new_value):
 
496
        if str_bool_new_value == 'True':
 
497
            new_value = True
 
498
        else:
 
499
            new_value = False
 
500
        repository.set_make_working_trees(new_value)
 
501
        return SuccessfulSmartServerResponse(('ok',))
 
502
 
 
503
 
185
504
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
186
505
    """Get the raw repository files as a tarball.
187
506
 
188
507
    The returned tarball contains a .bzr control directory which in turn
189
508
    contains a repository.
190
 
    
191
 
    This takes one parameter, compression, which currently must be 
 
509
 
 
510
    This takes one parameter, compression, which currently must be
192
511
    "", "gz", or "bz2".
193
512
 
194
513
    This is used to implement the Repository.copy_content_into operation.
195
514
    """
196
515
 
197
516
    def do_repository_request(self, repository, compression):
198
 
        from bzrlib import osutils
199
 
        repo_transport = repository.control_files._transport
200
517
        tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
201
518
        try:
202
519
            controldir_name = tmp_dirname + '/.bzr'
205
522
            osutils.rmtree(tmp_dirname)
206
523
 
207
524
    def _copy_to_tempdir(self, from_repo):
208
 
        tmp_dirname = tempfile.mkdtemp(prefix='tmpbzrclone')
 
525
        tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
209
526
        tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
210
527
        tmp_repo = from_repo._format.initialize(tmp_bzrdir)
211
528
        from_repo.copy_content_into(tmp_repo)
218
535
            # all finished; write the tempfile out to the network
219
536
            temp.seek(0)
220
537
            return SuccessfulSmartServerResponse(('ok',), temp.read())
221
 
            # FIXME: Don't read the whole thing into memory here; rather stream it
222
 
            # out from the file onto the network. mbp 20070411
 
538
            # FIXME: Don't read the whole thing into memory here; rather stream
 
539
            # it out from the file onto the network. mbp 20070411
223
540
        finally:
224
541
            temp.close()
225
542
 
235
552
            dirname = dirname.encode(sys.getfilesystemencoding())
236
553
            # python's tarball module includes the whole path by default so
237
554
            # override it
238
 
            assert dirname.endswith('.bzr')
 
555
            if not dirname.endswith('.bzr'):
 
556
                raise ValueError(dirname)
239
557
            tarball.add(dirname, '.bzr') # recursive by default
240
558
        finally:
241
559
            tarball.close()
 
560
 
 
561
 
 
562
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
 
563
    """Insert a record stream from a RemoteSink into a repository.
 
564
 
 
565
    This gets bytes pushed to it by the network infrastructure and turns that
 
566
    into a bytes iterator using a thread. That is then processed by
 
567
    _byte_stream_to_stream.
 
568
 
 
569
    New in 1.14.
 
570
    """
 
571
 
 
572
    def do_repository_request(self, repository, resume_tokens, lock_token):
 
573
        """StreamSink.insert_stream for a remote repository."""
 
574
        repository.lock_write(token=lock_token)
 
575
        self.do_insert_stream_request(repository, resume_tokens)
 
576
 
 
577
    def do_insert_stream_request(self, repository, resume_tokens):
 
578
        tokens = [token for token in resume_tokens.split(' ') if token]
 
579
        self.tokens = tokens
 
580
        self.repository = repository
 
581
        self.queue = Queue.Queue()
 
582
        self.insert_thread = threading.Thread(target=self._inserter_thread)
 
583
        self.insert_thread.start()
 
584
 
 
585
    def do_chunk(self, body_stream_chunk):
 
586
        self.queue.put(body_stream_chunk)
 
587
 
 
588
    def _inserter_thread(self):
 
589
        try:
 
590
            src_format, stream = _byte_stream_to_stream(
 
591
                self.blocking_byte_stream())
 
592
            self.insert_result = self.repository._get_sink().insert_stream(
 
593
                stream, src_format, self.tokens)
 
594
            self.insert_ok = True
 
595
        except:
 
596
            self.insert_exception = sys.exc_info()
 
597
            self.insert_ok = False
 
598
 
 
599
    def blocking_byte_stream(self):
 
600
        while True:
 
601
            bytes = self.queue.get()
 
602
            if bytes is StopIteration:
 
603
                return
 
604
            else:
 
605
                yield bytes
 
606
 
 
607
    def do_end(self):
 
608
        self.queue.put(StopIteration)
 
609
        if self.insert_thread is not None:
 
610
            self.insert_thread.join()
 
611
        if not self.insert_ok:
 
612
            exc_info = self.insert_exception
 
613
            raise exc_info[0], exc_info[1], exc_info[2]
 
614
        write_group_tokens, missing_keys = self.insert_result
 
615
        if write_group_tokens or missing_keys:
 
616
            # bzip needed? missing keys should typically be a small set.
 
617
            # Should this be a streaming body response ?
 
618
            missing_keys = sorted(missing_keys)
 
619
            bytes = bencode.bencode((write_group_tokens, missing_keys))
 
620
            self.repository.unlock()
 
621
            return SuccessfulSmartServerResponse(('missing-basis', bytes))
 
622
        else:
 
623
            self.repository.unlock()
 
624
            return SuccessfulSmartServerResponse(('ok', ))
 
625
 
 
626
 
 
627
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
 
628
    """Insert a record stream from a RemoteSink into an unlocked repository.
 
629
 
 
630
    This is the same as SmartServerRepositoryInsertStreamLocked, except it
 
631
    takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
 
632
    like pack format) repository.
 
633
 
 
634
    New in 1.13.
 
635
    """
 
636
 
 
637
    def do_repository_request(self, repository, resume_tokens):
 
638
        """StreamSink.insert_stream for a remote repository."""
 
639
        repository.lock_write()
 
640
        self.do_insert_stream_request(repository, resume_tokens)
 
641
 
 
642