~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
3210
3210
                yield data
3211
3211
 
3212
3212
 
3213
 
class _DirectPackAccess(object):
3214
 
    """Access to data in one or more packs with less translation."""
3215
 
 
3216
 
    def __init__(self, index_to_packs, reload_func=None, flush_func=None):
3217
 
        """Create a _DirectPackAccess object.
3218
 
 
3219
 
        :param index_to_packs: A dict mapping index objects to the transport
3220
 
            and file names for obtaining data.
3221
 
        :param reload_func: A function to call if we determine that the pack
3222
 
            files have moved and we need to reload our caches. See
3223
 
            bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
3224
 
        """
3225
 
        self._container_writer = None
3226
 
        self._write_index = None
3227
 
        self._indices = index_to_packs
3228
 
        self._reload_func = reload_func
3229
 
        self._flush_func = flush_func
3230
 
 
3231
 
    def add_raw_records(self, key_sizes, raw_data):
3232
 
        """Add raw knit bytes to a storage area.
3233
 
 
3234
 
        The data is spooled to the container writer in one bytes-record per
3235
 
        raw data item.
3236
 
 
3237
 
        :param sizes: An iterable of tuples containing the key and size of each
3238
 
            raw data segment.
3239
 
        :param raw_data: A bytestring containing the data.
3240
 
        :return: A list of memos to retrieve the record later. Each memo is an
3241
 
            opaque index memo. For _DirectPackAccess the memo is (index, pos,
3242
 
            length), where the index field is the write_index object supplied
3243
 
            to the PackAccess object.
3244
 
        """
3245
 
        if type(raw_data) is not str:
3246
 
            raise AssertionError(
3247
 
                'data must be plain bytes was %s' % type(raw_data))
3248
 
        result = []
3249
 
        offset = 0
3250
 
        for key, size in key_sizes:
3251
 
            p_offset, p_length = self._container_writer.add_bytes_record(
3252
 
                raw_data[offset:offset+size], [])
3253
 
            offset += size
3254
 
            result.append((self._write_index, p_offset, p_length))
3255
 
        return result
3256
 
 
3257
 
    def flush(self):
3258
 
        """Flush pending writes on this access object.
3259
 
 
3260
 
        This will flush any buffered writes to a NewPack.
3261
 
        """
3262
 
        if self._flush_func is not None:
3263
 
            self._flush_func()
3264
 
            
3265
 
    def get_raw_records(self, memos_for_retrieval):
3266
 
        """Get the raw bytes for a records.
3267
 
 
3268
 
        :param memos_for_retrieval: An iterable containing the (index, pos,
3269
 
            length) memo for retrieving the bytes. The Pack access method
3270
 
            looks up the pack to use for a given record in its index_to_pack
3271
 
            map.
3272
 
        :return: An iterator over the bytes of the records.
3273
 
        """
3274
 
        # first pass, group into same-index requests
3275
 
        request_lists = []
3276
 
        current_index = None
3277
 
        for (index, offset, length) in memos_for_retrieval:
3278
 
            if current_index == index:
3279
 
                current_list.append((offset, length))
3280
 
            else:
3281
 
                if current_index is not None:
3282
 
                    request_lists.append((current_index, current_list))
3283
 
                current_index = index
3284
 
                current_list = [(offset, length)]
3285
 
        # handle the last entry
3286
 
        if current_index is not None:
3287
 
            request_lists.append((current_index, current_list))
3288
 
        for index, offsets in request_lists:
3289
 
            try:
3290
 
                transport, path = self._indices[index]
3291
 
            except KeyError:
3292
 
                # A KeyError here indicates that someone has triggered an index
3293
 
                # reload, and this index has gone missing, we need to start
3294
 
                # over.
3295
 
                if self._reload_func is None:
3296
 
                    # If we don't have a _reload_func there is nothing that can
3297
 
                    # be done
3298
 
                    raise
3299
 
                raise errors.RetryWithNewPacks(index,
3300
 
                                               reload_occurred=True,
3301
 
                                               exc_info=sys.exc_info())
3302
 
            try:
3303
 
                reader = pack.make_readv_reader(transport, path, offsets)
3304
 
                for names, read_func in reader.iter_records():
3305
 
                    yield read_func(None)
3306
 
            except errors.NoSuchFile:
3307
 
                # A NoSuchFile error indicates that a pack file has gone
3308
 
                # missing on disk, we need to trigger a reload, and start over.
3309
 
                if self._reload_func is None:
3310
 
                    raise
3311
 
                raise errors.RetryWithNewPacks(transport.abspath(path),
3312
 
                                               reload_occurred=False,
3313
 
                                               exc_info=sys.exc_info())
3314
 
 
3315
 
    def set_writer(self, writer, index, transport_packname):
3316
 
        """Set a writer to use for adding data."""
3317
 
        if index is not None:
3318
 
            self._indices[index] = transport_packname
3319
 
        self._container_writer = writer
3320
 
        self._write_index = index
3321
 
 
3322
 
    def reload_or_raise(self, retry_exc):
3323
 
        """Try calling the reload function, or re-raise the original exception.
3324
 
 
3325
 
        This should be called after _DirectPackAccess raises a
3326
 
        RetryWithNewPacks exception. This function will handle the common logic
3327
 
        of determining when the error is fatal versus being temporary.
3328
 
        It will also make sure that the original exception is raised, rather
3329
 
        than the RetryWithNewPacks exception.
3330
 
 
3331
 
        If this function returns, then the calling function should retry
3332
 
        whatever operation was being performed. Otherwise an exception will
3333
 
        be raised.
3334
 
 
3335
 
        :param retry_exc: A RetryWithNewPacks exception.
3336
 
        """
3337
 
        is_error = False
3338
 
        if self._reload_func is None:
3339
 
            is_error = True
3340
 
        elif not self._reload_func():
3341
 
            # The reload claimed that nothing changed
3342
 
            if not retry_exc.reload_occurred:
3343
 
                # If there wasn't an earlier reload, then we really were
3344
 
                # expecting to find changes. We didn't find them, so this is a
3345
 
                # hard error
3346
 
                is_error = True
3347
 
        if is_error:
3348
 
            exc_class, exc_value, exc_traceback = retry_exc.exc_info
3349
 
            raise exc_class, exc_value, exc_traceback
3350
 
 
3351
 
 
3352
3213
def annotate_knit(knit, revision_id):
3353
3214
    """Annotate a knit with no cached annotations.
3354
3215