3154
class _DirectPackAccess(object):
3155
"""Access to data in one or more packs with less translation."""
3157
def __init__(self, index_to_packs, reload_func=None, flush_func=None):
3158
"""Create a _DirectPackAccess object.
3160
:param index_to_packs: A dict mapping index objects to the transport
3161
and file names for obtaining data.
3162
:param reload_func: A function to call if we determine that the pack
3163
files have moved and we need to reload our caches. See
3164
bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
3166
self._container_writer = None
3167
self._write_index = None
3168
self._indices = index_to_packs
3169
self._reload_func = reload_func
3170
self._flush_func = flush_func
3172
def add_raw_records(self, key_sizes, raw_data):
3173
"""Add raw knit bytes to a storage area.
3175
The data is spooled to the container writer in one bytes-record per
3178
:param sizes: An iterable of tuples containing the key and size of each
3180
:param raw_data: A bytestring containing the data.
3181
:return: A list of memos to retrieve the record later. Each memo is an
3182
opaque index memo. For _DirectPackAccess the memo is (index, pos,
3183
length), where the index field is the write_index object supplied
3184
to the PackAccess object.
3186
if type(raw_data) != str:
3187
raise AssertionError(
3188
'data must be plain bytes was %s' % type(raw_data))
3191
for key, size in key_sizes:
3192
p_offset, p_length = self._container_writer.add_bytes_record(
3193
raw_data[offset:offset+size], [])
3195
result.append((self._write_index, p_offset, p_length))
3199
"""Flush pending writes on this access object.
3201
This will flush any buffered writes to a NewPack.
3203
if self._flush_func is not None:
3206
def get_raw_records(self, memos_for_retrieval):
3207
"""Get the raw bytes for a records.
3209
:param memos_for_retrieval: An iterable containing the (index, pos,
3210
length) memo for retrieving the bytes. The Pack access method
3211
looks up the pack to use for a given record in its index_to_pack
3213
:return: An iterator over the bytes of the records.
3215
# first pass, group into same-index requests
3217
current_index = None
3218
for (index, offset, length) in memos_for_retrieval:
3219
if current_index == index:
3220
current_list.append((offset, length))
3222
if current_index is not None:
3223
request_lists.append((current_index, current_list))
3224
current_index = index
3225
current_list = [(offset, length)]
3226
# handle the last entry
3227
if current_index is not None:
3228
request_lists.append((current_index, current_list))
3229
for index, offsets in request_lists:
3231
transport, path = self._indices[index]
3233
# A KeyError here indicates that someone has triggered an index
3234
# reload, and this index has gone missing, we need to start
3236
if self._reload_func is None:
3237
# If we don't have a _reload_func there is nothing that can
3240
raise errors.RetryWithNewPacks(index,
3241
reload_occurred=True,
3242
exc_info=sys.exc_info())
3244
reader = pack.make_readv_reader(transport, path, offsets)
3245
for names, read_func in reader.iter_records():
3246
yield read_func(None)
3247
except errors.NoSuchFile:
3248
# A NoSuchFile error indicates that a pack file has gone
3249
# missing on disk, we need to trigger a reload, and start over.
3250
if self._reload_func is None:
3252
raise errors.RetryWithNewPacks(transport.abspath(path),
3253
reload_occurred=False,
3254
exc_info=sys.exc_info())
3256
def set_writer(self, writer, index, transport_packname):
3257
"""Set a writer to use for adding data."""
3258
if index is not None:
3259
self._indices[index] = transport_packname
3260
self._container_writer = writer
3261
self._write_index = index
3263
def reload_or_raise(self, retry_exc):
3264
"""Try calling the reload function, or re-raise the original exception.
3266
This should be called after _DirectPackAccess raises a
3267
RetryWithNewPacks exception. This function will handle the common logic
3268
of determining when the error is fatal versus being temporary.
3269
It will also make sure that the original exception is raised, rather
3270
than the RetryWithNewPacks exception.
3272
If this function returns, then the calling function should retry
3273
whatever operation was being performed. Otherwise an exception will
3276
:param retry_exc: A RetryWithNewPacks exception.
3279
if self._reload_func is None:
3281
elif not self._reload_func():
3282
# The reload claimed that nothing changed
3283
if not retry_exc.reload_occurred:
3284
# If there wasn't an earlier reload, then we really were
3285
# expecting to find changes. We didn't find them, so this is a
3289
exc_class, exc_value, exc_traceback = retry_exc.exc_info
3290
raise exc_class, exc_value, exc_traceback
3293
# Deprecated, use PatienceSequenceMatcher instead
3294
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
3297
3215
def annotate_knit(knit, revision_id):
3298
3216
"""Annotate a knit with no cached annotations.
3304
3222
annotator = _KnitAnnotator(knit)
3305
return iter(annotator.annotate(revision_id))
3308
class _KnitAnnotator(object):
3223
return iter(annotator.annotate_flat(revision_id))
3226
class _KnitAnnotator(annotate.Annotator):
3309
3227
"""Build up the annotations for a text."""
3311
def __init__(self, knit):
3314
# Content objects, differs from fulltexts because of how final newlines
3315
# are treated by knits. the content objects here will always have a
3317
self._fulltext_contents = {}
3319
# Annotated lines of specific revisions
3320
self._annotated_lines = {}
3322
# Track the raw data for nodes that we could not process yet.
3323
# This maps the revision_id of the base to a list of children that will
3324
# annotated from it.
3325
self._pending_children = {}
3327
# Nodes which cannot be extracted
3328
self._ghosts = set()
3330
# Track how many children this node has, so we know if we need to keep
3332
self._annotate_children = {}
3333
self._compression_children = {}
3229
def __init__(self, vf):
3230
annotate.Annotator.__init__(self, vf)
3232
# TODO: handle Nodes which cannot be extracted
3233
# self._ghosts = set()
3235
# Map from (key, parent_key) => matching_blocks, should be 'use once'
3236
self._matching_blocks = {}
3238
# KnitContent objects
3239
self._content_objects = {}
3240
# The number of children that depend on this fulltext content object
3241
self._num_compression_children = {}
3242
# Delta records that need their compression parent before they can be
3244
self._pending_deltas = {}
3245
# Fulltext records that are waiting for their parents fulltexts before
3246
# they can be yielded for annotation
3247
self._pending_annotation = {}
3335
3249
self._all_build_details = {}
3336
# The children => parent revision_id graph
3337
self._revision_id_graph = {}
3339
self._heads_provider = None
3341
self._nodes_to_keep_annotations = set()
3342
self._generations_until_keep = 100
3344
def set_generations_until_keep(self, value):
3345
"""Set the number of generations before caching a node.
3347
Setting this to -1 will cache every merge node, setting this higher
3348
will cache fewer nodes.
3350
self._generations_until_keep = value
3352
def _add_fulltext_content(self, revision_id, content_obj):
3353
self._fulltext_contents[revision_id] = content_obj
3354
# TODO: jam 20080305 It might be good to check the sha1digest here
3355
return content_obj.text()
3357
def _check_parents(self, child, nodes_to_annotate):
3358
"""Check if all parents have been processed.
3360
:param child: A tuple of (rev_id, parents, raw_content)
3361
:param nodes_to_annotate: If child is ready, add it to
3362
nodes_to_annotate, otherwise put it back in self._pending_children
3364
for parent_id in child[1]:
3365
if (parent_id not in self._annotated_lines):
3366
# This parent is present, but another parent is missing
3367
self._pending_children.setdefault(parent_id,
3371
# This one is ready to be processed
3372
nodes_to_annotate.append(child)
3374
def _add_annotation(self, revision_id, fulltext, parent_ids,
3375
left_matching_blocks=None):
3376
"""Add an annotation entry.
3378
All parents should already have been annotated.
3379
:return: A list of children that now have their parents satisfied.
3381
a = self._annotated_lines
3382
annotated_parent_lines = [a[p] for p in parent_ids]
3383
annotated_lines = list(annotate.reannotate(annotated_parent_lines,
3384
fulltext, revision_id, left_matching_blocks,
3385
heads_provider=self._get_heads_provider()))
3386
self._annotated_lines[revision_id] = annotated_lines
3387
for p in parent_ids:
3388
ann_children = self._annotate_children[p]
3389
ann_children.remove(revision_id)
3390
if (not ann_children
3391
and p not in self._nodes_to_keep_annotations):
3392
del self._annotated_lines[p]
3393
del self._all_build_details[p]
3394
if p in self._fulltext_contents:
3395
del self._fulltext_contents[p]
3396
# Now that we've added this one, see if there are any pending
3397
# deltas to be done, certainly this parent is finished
3398
nodes_to_annotate = []
3399
for child in self._pending_children.pop(revision_id, []):
3400
self._check_parents(child, nodes_to_annotate)
3401
return nodes_to_annotate
3403
3251
def _get_build_graph(self, key):
3404
3252
"""Get the graphs for building texts and annotations.
3412
3260
passing to read_records_iter to start reading in the raw data from
3415
if key in self._annotated_lines:
3418
3263
pending = set([key])
3266
self._num_needed_children[key] = 1
3423
3268
# get all pending nodes
3425
3269
this_iteration = pending
3426
build_details = self._knit._index.get_build_details(this_iteration)
3270
build_details = self._vf._index.get_build_details(this_iteration)
3427
3271
self._all_build_details.update(build_details)
3428
# new_nodes = self._knit._index._get_entries(this_iteration)
3272
# new_nodes = self._vf._index._get_entries(this_iteration)
3429
3273
pending = set()
3430
3274
for key, details in build_details.iteritems():
3431
(index_memo, compression_parent, parents,
3275
(index_memo, compression_parent, parent_keys,
3432
3276
record_details) = details
3433
self._revision_id_graph[key] = parents
3277
self._parent_map[key] = parent_keys
3278
self._heads_provider = None
3434
3279
records.append((key, index_memo))
3435
3280
# Do we actually need to check _annotated_lines?
3436
pending.update(p for p in parents
3437
if p not in self._all_build_details)
3281
pending.update([p for p in parent_keys
3282
if p not in self._all_build_details])
3284
for parent_key in parent_keys:
3285
if parent_key in self._num_needed_children:
3286
self._num_needed_children[parent_key] += 1
3288
self._num_needed_children[parent_key] = 1
3438
3289
if compression_parent:
3439
self._compression_children.setdefault(compression_parent,
3442
for parent in parents:
3443
self._annotate_children.setdefault(parent,
3445
num_gens = generation - kept_generation
3446
if ((num_gens >= self._generations_until_keep)
3447
and len(parents) > 1):
3448
kept_generation = generation
3449
self._nodes_to_keep_annotations.add(key)
3290
if compression_parent in self._num_compression_children:
3291
self._num_compression_children[compression_parent] += 1
3293
self._num_compression_children[compression_parent] = 1
3451
3295
missing_versions = this_iteration.difference(build_details.keys())
3452
self._ghosts.update(missing_versions)
3453
for missing_version in missing_versions:
3454
# add a key, no parents
3455
self._revision_id_graph[missing_version] = ()
3456
pending.discard(missing_version) # don't look for it
3457
if self._ghosts.intersection(self._compression_children):
3459
"We cannot have nodes which have a ghost compression parent:\n"
3461
"compression children: %r"
3462
% (self._ghosts, self._compression_children))
3463
# Cleanout anything that depends on a ghost so that we don't wait for
3464
# the ghost to show up
3465
for node in self._ghosts:
3466
if node in self._annotate_children:
3467
# We won't be building this node
3468
del self._annotate_children[node]
3296
if missing_versions:
3297
for key in missing_versions:
3298
if key in self._parent_map and key in self._text_cache:
3299
# We already have this text ready, we just need to
3300
# yield it later so we get it annotated
3302
parent_keys = self._parent_map[key]
3303
for parent_key in parent_keys:
3304
if parent_key in self._num_needed_children:
3305
self._num_needed_children[parent_key] += 1
3307
self._num_needed_children[parent_key] = 1
3308
pending.update([p for p in parent_keys
3309
if p not in self._all_build_details])
3311
raise errors.RevisionNotPresent(key, self._vf)
3469
3312
# Generally we will want to read the records in reverse order, because
3470
3313
# we find the parent nodes after the children
3471
3314
records.reverse()
3474
def _annotate_records(self, records):
3475
"""Build the annotations for the listed records."""
3315
return records, ann_keys
3317
def _get_needed_texts(self, key, pb=None):
3318
# if True or len(self._vf._immediate_fallback_vfs) > 0:
3319
if len(self._vf._immediate_fallback_vfs) > 0:
3320
# If we have fallbacks, go to the generic path
3321
for v in annotate.Annotator._get_needed_texts(self, key, pb=pb):
3326
records, ann_keys = self._get_build_graph(key)
3327
for idx, (sub_key, text, num_lines) in enumerate(
3328
self._extract_texts(records)):
3330
pb.update(gettext('annotating'), idx, len(records))
3331
yield sub_key, text, num_lines
3332
for sub_key in ann_keys:
3333
text = self._text_cache[sub_key]
3334
num_lines = len(text) # bad assumption
3335
yield sub_key, text, num_lines
3337
except errors.RetryWithNewPacks, e:
3338
self._vf._access.reload_or_raise(e)
3339
# The cached build_details are no longer valid
3340
self._all_build_details.clear()
3342
def _cache_delta_blocks(self, key, compression_parent, delta, lines):
3343
parent_lines = self._text_cache[compression_parent]
3344
blocks = list(KnitContent.get_line_delta_blocks(delta, parent_lines, lines))
3345
self._matching_blocks[(key, compression_parent)] = blocks
3347
def _expand_record(self, key, parent_keys, compression_parent, record,
3350
if compression_parent:
3351
if compression_parent not in self._content_objects:
3352
# Waiting for the parent
3353
self._pending_deltas.setdefault(compression_parent, []).append(
3354
(key, parent_keys, record, record_details))
3356
# We have the basis parent, so expand the delta
3357
num = self._num_compression_children[compression_parent]
3360
base_content = self._content_objects.pop(compression_parent)
3361
self._num_compression_children.pop(compression_parent)
3363
self._num_compression_children[compression_parent] = num
3364
base_content = self._content_objects[compression_parent]
3365
# It is tempting to want to copy_base_content=False for the last
3366
# child object. However, whenever noeol=False,
3367
# self._text_cache[parent_key] is content._lines. So mutating it
3368
# gives very bad results.
3369
# The alternative is to copy the lines into text cache, but then we
3370
# are copying anyway, so just do it here.
3371
content, delta = self._vf._factory.parse_record(
3372
key, record, record_details, base_content,
3373
copy_base_content=True)
3376
content, _ = self._vf._factory.parse_record(
3377
key, record, record_details, None)
3378
if self._num_compression_children.get(key, 0) > 0:
3379
self._content_objects[key] = content
3380
lines = content.text()
3381
self._text_cache[key] = lines
3382
if delta is not None:
3383
self._cache_delta_blocks(key, compression_parent, delta, lines)
3386
def _get_parent_annotations_and_matches(self, key, text, parent_key):
3387
"""Get the list of annotations for the parent, and the matching lines.
3389
:param text: The opaque value given by _get_needed_texts
3390
:param parent_key: The key for the parent text
3391
:return: (parent_annotations, matching_blocks)
3392
parent_annotations is a list as long as the number of lines in
3394
matching_blocks is a list of (parent_idx, text_idx, len) tuples
3395
indicating which lines match between the two texts
3397
block_key = (key, parent_key)
3398
if block_key in self._matching_blocks:
3399
blocks = self._matching_blocks.pop(block_key)
3400
parent_annotations = self._annotations_cache[parent_key]
3401
return parent_annotations, blocks
3402
return annotate.Annotator._get_parent_annotations_and_matches(self,
3403
key, text, parent_key)
3405
def _process_pending(self, key):
3406
"""The content for 'key' was just processed.
3408
Determine if there is any more pending work to be processed.
3411
if key in self._pending_deltas:
3412
compression_parent = key
3413
children = self._pending_deltas.pop(key)
3414
for child_key, parent_keys, record, record_details in children:
3415
lines = self._expand_record(child_key, parent_keys,
3417
record, record_details)
3418
if self._check_ready_for_annotations(child_key, parent_keys):
3419
to_return.append(child_key)
3420
# Also check any children that are waiting for this parent to be
3422
if key in self._pending_annotation:
3423
children = self._pending_annotation.pop(key)
3424
to_return.extend([c for c, p_keys in children
3425
if self._check_ready_for_annotations(c, p_keys)])
3428
def _check_ready_for_annotations(self, key, parent_keys):
3429
"""return true if this text is ready to be yielded.
3431
Otherwise, this will return False, and queue the text into
3432
self._pending_annotation
3434
for parent_key in parent_keys:
3435
if parent_key not in self._annotations_cache:
3436
# still waiting on at least one parent text, so queue it up
3437
# Note that if there are multiple parents, we need to wait
3439
self._pending_annotation.setdefault(parent_key,
3440
[]).append((key, parent_keys))
3444
def _extract_texts(self, records):
3445
"""Extract the various texts needed based on records"""
3476
3446
# We iterate in the order read, rather than a strict order requested
3477
3447
# However, process what we can, and put off to the side things that
3478
3448
# still need parents, cleaning them up when those parents are
3480
for (rev_id, record,
3481
digest) in self._knit._read_records_iter(records):
3482
if rev_id in self._annotated_lines:
3451
# 1) As 'records' are read, see if we can expand these records into
3452
# Content objects (and thus lines)
3453
# 2) If a given line-delta is waiting on its compression parent, it
3454
# gets queued up into self._pending_deltas, otherwise we expand
3455
# it, and put it into self._text_cache and self._content_objects
3456
# 3) If we expanded the text, we will then check to see if all
3457
# parents have also been processed. If so, this text gets yielded,
3458
# else this record gets set aside into pending_annotation
3459
# 4) Further, if we expanded the text in (2), we will then check to
3460
# see if there are any children in self._pending_deltas waiting to
3461
# also be processed. If so, we go back to (2) for those
3462
# 5) Further again, if we yielded the text, we can then check if that
3463
# 'unlocks' any of the texts in pending_annotations, which should
3464
# then get yielded as well
3465
# Note that both steps 4 and 5 are 'recursive' in that unlocking one
3466
# compression child could unlock yet another, and yielding a fulltext
3467
# will also 'unlock' the children that are waiting on that annotation.
3468
# (Though also, unlocking 1 parent's fulltext, does not unlock a child
3469
# if other parents are also waiting.)
3470
# We want to yield content before expanding child content objects, so
3471
# that we know when we can re-use the content lines, and the annotation
3472
# code can know when it can stop caching fulltexts, as well.
3474
# Children that are missing their compression parent
3476
for (key, record, digest) in self._vf._read_records_iter(records):
3478
details = self._all_build_details[key]
3479
(_, compression_parent, parent_keys, record_details) = details
3480
lines = self._expand_record(key, parent_keys, compression_parent,
3481
record, record_details)
3483
# Pending delta should be queued up
3484
parent_ids = self._revision_id_graph[rev_id]
3485
parent_ids = [p for p in parent_ids if p not in self._ghosts]
3486
details = self._all_build_details[rev_id]
3487
(index_memo, compression_parent, parents,
3488
record_details) = details
3489
nodes_to_annotate = []
3490
# TODO: Remove the punning between compression parents, and
3491
# parent_ids, we should be able to do this without assuming
3493
if len(parent_ids) == 0:
3494
# There are no parents for this node, so just add it
3495
# TODO: This probably needs to be decoupled
3496
fulltext_content, delta = self._knit._factory.parse_record(
3497
rev_id, record, record_details, None)
3498
fulltext = self._add_fulltext_content(rev_id, fulltext_content)
3499
nodes_to_annotate.extend(self._add_annotation(rev_id, fulltext,
3500
parent_ids, left_matching_blocks=None))
3502
child = (rev_id, parent_ids, record)
3503
# Check if all the parents are present
3504
self._check_parents(child, nodes_to_annotate)
3505
while nodes_to_annotate:
3506
# Should we use a queue here instead of a stack?
3507
(rev_id, parent_ids, record) = nodes_to_annotate.pop()
3508
(index_memo, compression_parent, parents,
3509
record_details) = self._all_build_details[rev_id]
3511
if compression_parent is not None:
3512
comp_children = self._compression_children[compression_parent]
3513
if rev_id not in comp_children:
3514
raise AssertionError("%r not in compression children %r"
3515
% (rev_id, comp_children))
3516
# If there is only 1 child, it is safe to reuse this
3518
reuse_content = (len(comp_children) == 1
3519
and compression_parent not in
3520
self._nodes_to_keep_annotations)
3522
# Remove it from the cache since it will be changing
3523
parent_fulltext_content = self._fulltext_contents.pop(compression_parent)
3524
# Make sure to copy the fulltext since it might be
3526
parent_fulltext = list(parent_fulltext_content.text())
3528
parent_fulltext_content = self._fulltext_contents[compression_parent]
3529
parent_fulltext = parent_fulltext_content.text()
3530
comp_children.remove(rev_id)
3531
fulltext_content, delta = self._knit._factory.parse_record(
3532
rev_id, record, record_details,
3533
parent_fulltext_content,
3534
copy_base_content=(not reuse_content))
3535
fulltext = self._add_fulltext_content(rev_id,
3537
if compression_parent == parent_ids[0]:
3538
# the compression_parent is the left parent, so we can
3540
blocks = KnitContent.get_line_delta_blocks(delta,
3541
parent_fulltext, fulltext)
3543
fulltext_content = self._knit._factory.parse_fulltext(
3545
fulltext = self._add_fulltext_content(rev_id,
3547
nodes_to_annotate.extend(
3548
self._add_annotation(rev_id, fulltext, parent_ids,
3549
left_matching_blocks=blocks))
3551
def _get_heads_provider(self):
3552
"""Create a heads provider for resolving ancestry issues."""
3553
if self._heads_provider is not None:
3554
return self._heads_provider
3555
self._heads_provider = _mod_graph.KnownGraph(self._revision_id_graph)
3556
return self._heads_provider
3558
def annotate(self, key):
3559
"""Return the annotated fulltext at the given key.
3561
:param key: The key to annotate.
3563
if len(self._knit._fallback_vfs) > 0:
3564
# stacked knits can't use the fast path at present.
3565
return self._simple_annotate(key)
3568
records = self._get_build_graph(key)
3569
if key in self._ghosts:
3570
raise errors.RevisionNotPresent(key, self._knit)
3571
self._annotate_records(records)
3572
return self._annotated_lines[key]
3573
except errors.RetryWithNewPacks, e:
3574
self._knit._access.reload_or_raise(e)
3575
# The cached build_details are no longer valid
3576
self._all_build_details.clear()
3578
def _simple_annotate(self, key):
3579
"""Return annotated fulltext, rediffing from the full texts.
3581
This is slow but makes no assumptions about the repository
3582
being able to produce line deltas.
3584
# TODO: this code generates a parent maps of present ancestors; it
3585
# could be split out into a separate method
3586
# -- mbp and robertc 20080704
3587
graph = _mod_graph.Graph(self._knit)
3588
parent_map = dict((k, v) for k, v in graph.iter_ancestry([key])
3591
raise errors.RevisionNotPresent(key, self)
3592
keys = parent_map.keys()
3593
heads_provider = _mod_graph.KnownGraph(parent_map)
3595
reannotate = annotate.reannotate
3596
for record in self._knit.get_record_stream(keys, 'topological', True):
3598
fulltext = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
3599
parents = parent_map[key]
3600
if parents is not None:
3601
parent_lines = [parent_cache[parent] for parent in parent_map[key]]
3604
parent_cache[key] = list(
3605
reannotate(parent_lines, fulltext, key, None, heads_provider))
3607
return parent_cache[key]
3609
raise errors.RevisionNotPresent(key, self._knit)
3485
# At this point, we may be able to yield this content, if all
3486
# parents are also finished
3487
yield_this_text = self._check_ready_for_annotations(key,
3490
# All parents present
3491
yield key, lines, len(lines)
3492
to_process = self._process_pending(key)
3494
this_process = to_process
3496
for key in this_process:
3497
lines = self._text_cache[key]
3498
yield key, lines, len(lines)
3499
to_process.extend(self._process_pending(key))
3613
from bzrlib._knit_load_data_c import _load_data_c as _load_data
3502
from bzrlib._knit_load_data_pyx import _load_data_c as _load_data
3503
except ImportError, e:
3504
osutils.failed_to_load_extension(e)
3615
3505
from bzrlib._knit_load_data_py import _load_data_py as _load_data