~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Jelmer Vernooij
  • Date: 2009-02-23 20:55:58 UTC
  • mfrom: (4034 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4053.
  • Revision ID: jelmer@samba.org-20090223205558-1cx2k4w1zgs8r5qa
Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
import bz2
20
20
import os
 
21
import struct
21
22
import sys
 
23
import tarfile
22
24
import tempfile
23
 
import tarfile
 
25
import threading
 
26
import Queue
24
27
 
25
28
from bzrlib import (
26
29
    errors,
27
30
    osutils,
 
31
    pack,
28
32
    )
29
33
from bzrlib.bzrdir import BzrDir
30
34
from bzrlib.smart.request import (
32
36
    SmartServerRequest,
33
37
    SuccessfulSmartServerResponse,
34
38
    )
35
 
from bzrlib.repository import _strip_NULL_ghosts
 
39
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
36
40
from bzrlib import revision as _mod_revision
 
41
from bzrlib.util import bencode
 
42
from bzrlib.versionedfile import NetworkRecordStream
37
43
 
38
44
 
39
45
class SmartServerRepositoryRequest(SmartServerRequest):
41
47
 
42
48
    def do(self, path, *args):
43
49
        """Execute a repository request.
44
 
        
 
50
 
45
51
        All Repository requests take a path to the repository as their first
46
52
        argument.  The repository must be at the exact path given by the
47
53
        client - no searching is done.
106
112
 
107
113
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
108
114
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
109
 
    
 
115
 
110
116
    def do_repository_request(self, repository, *revision_ids):
111
117
        """Get parent details for some revisions.
112
 
        
 
118
 
113
119
        All the parents for revision_ids are returned. Additionally up to 64KB
114
120
        of additional parent data found by performing a breadth first search
115
121
        from revision_ids is returned. The verb takes a body containing the
191
197
 
192
198
 
193
199
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
194
 
    
 
200
 
195
201
    def do_readlocked_repository_request(self, repository, revision_id):
196
202
        """Return the result of repository.get_revision_graph(revision_id).
197
203
 
198
204
        Deprecated as of bzr 1.4, but supported for older clients.
199
 
        
 
205
 
200
206
        :param repository: The repository to query in.
201
207
        :param revision_id: The utf8 encoded revision_id to get a graph from.
202
208
        :return: A smart server response where the body contains an utf8
336
342
        return SuccessfulSmartServerResponse(('ok',))
337
343
 
338
344
 
 
345
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
 
346
 
 
347
    def do_repository_request(self, repository, str_bool_new_value):
 
348
        if str_bool_new_value == 'True':
 
349
            new_value = True
 
350
        else:
 
351
            new_value = False
 
352
        repository.set_make_working_trees(new_value)
 
353
        return SuccessfulSmartServerResponse(('ok',))
 
354
 
 
355
 
339
356
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
340
357
    """Get the raw repository files as a tarball.
341
358
 
342
359
    The returned tarball contains a .bzr control directory which in turn
343
360
    contains a repository.
344
 
    
345
 
    This takes one parameter, compression, which currently must be 
 
361
 
 
362
    This takes one parameter, compression, which currently must be
346
363
    "", "gz", or "bz2".
347
364
 
348
365
    This is used to implement the Repository.copy_content_into operation.
392
409
            tarball.add(dirname, '.bzr') # recursive by default
393
410
        finally:
394
411
            tarball.close()
 
412
 
 
413
 
 
414
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
 
415
 
 
416
    def do_repository_request(self, repository, resume_tokens):
 
417
        """StreamSink.insert_stream for a remote repository."""
 
418
        repository.lock_write()
 
419
        tokens = [token for token in resume_tokens.split(' ') if token]
 
420
        if tokens:
 
421
            repository.resume_write_group(tokens)
 
422
        else:
 
423
            repository.start_write_group()
 
424
        self.repository = repository
 
425
        self.stream_decoder = pack.ContainerPushParser()
 
426
        self.src_format = None
 
427
        self.queue = Queue.Queue()
 
428
        self.insert_thread = None
 
429
 
 
430
    def do_chunk(self, body_stream_chunk):
 
431
        self.stream_decoder.accept_bytes(body_stream_chunk)
 
432
        for record in self.stream_decoder.read_pending_records():
 
433
            record_names, record_bytes = record
 
434
            if self.src_format is None:
 
435
                src_format_name = record_bytes
 
436
                src_format = network_format_registry.get(src_format_name)
 
437
                self.src_format = src_format
 
438
                self.insert_thread = threading.Thread(target=self._inserter_thread)
 
439
                self.insert_thread.start()
 
440
            else:
 
441
                record_name, = record_names
 
442
                substream_type = record_name[0]
 
443
                stream = NetworkRecordStream([record_bytes])
 
444
                for record in stream.read():
 
445
                    self.queue.put((substream_type, [record]))
 
446
 
 
447
    def _inserter_thread(self):
 
448
        self.repository._get_sink().insert_stream(self.blocking_read_stream(),
 
449
                self.src_format)
 
450
 
 
451
    def blocking_read_stream(self):
 
452
        while True:
 
453
            item = self.queue.get()
 
454
            if item is StopIteration:
 
455
                return
 
456
            else:
 
457
                yield item
 
458
 
 
459
    def do_end(self):
 
460
        self.queue.put(StopIteration)
 
461
        if self.insert_thread is not None:
 
462
            self.insert_thread.join()
 
463
        try:
 
464
            missing_keys = set()
 
465
            for prefix, versioned_file in (
 
466
                ('texts', self.repository.texts),
 
467
                ('inventories', self.repository.inventories),
 
468
                ('revisions', self.repository.revisions),
 
469
                ('signatures', self.repository.signatures),
 
470
                ):
 
471
                missing_keys.update((prefix,) + key for key in
 
472
                    versioned_file.get_missing_compression_parent_keys())
 
473
        except NotImplementedError:
 
474
            # cannot even attempt suspending.
 
475
            pass
 
476
        else:
 
477
            if missing_keys:
 
478
                # suspend the write group and tell the caller what we is
 
479
                # missing. We know we can suspend or else we would not have
 
480
                # entered this code path. (All repositories that can handle
 
481
                # missing keys can handle suspending a write group).
 
482
                write_group_tokens = self.repository.suspend_write_group()
 
483
                # bzip needed? missing keys should typically be a small set.
 
484
                # Should this be a streaming body response ?
 
485
                missing_keys = sorted(missing_keys)
 
486
                bytes = bencode.bencode((write_group_tokens, missing_keys))
 
487
                return SuccessfulSmartServerResponse(('missing-basis', bytes))
 
488
        # All finished.
 
489
        self.repository.commit_write_group()
 
490
        self.repository.unlock()
 
491
        return SuccessfulSmartServerResponse(('ok', ))