1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Knit versionedfile implementation.
19
A knit is a versioned file implementation that supports efficient append only
23
lifeless: the data file is made up of "delta records". each delta record has a delta header
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3) the digest of
25
the -expanded data- (ie, the delta applied to the parent). the delta also ends with a
26
end-marker; simply "end VERSION"
28
delta can be line or full contents.a
29
... the 8's there are the index number of the annotation.
30
version robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 7 c7d23b2a5bd6ca00e8e266cec0ec228158ee9f9e
34
8 e.set('executable', 'yes')
36
8 if elt.get('executable') == 'yes':
37
8 ie.executable = True
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
42
09:33 < jrydberg> lifeless: each index is made up of a tuple of; version id, options, position, size, parents
43
09:33 < jrydberg> lifeless: the parents are currently dictionary compressed
44
09:33 < jrydberg> lifeless: (meaning it currently does not support ghosts)
45
09:33 < lifeless> right
46
09:33 < jrydberg> lifeless: the position and size is the range in the data file
49
so the index sequence is the dictionary compressed sequence number used
50
in the deltas to provide line annotation
55
# 10:16 < lifeless> make partial index writes safe
56
# 10:16 < lifeless> implement 'knit.check()' like weave.check()
57
# 10:17 < lifeless> record known ghosts so we can detect when they are filled in rather than the current 'reweave
59
# move sha1 out of the content so that join is faster at verifying parents
60
# record content length ?
63
from cStringIO import StringIO
64
from itertools import izip, chain
68
from bzrlib.lazy_import import lazy_import
69
lazy_import(globals(), """
89
from bzrlib.errors import (
97
RevisionAlreadyPresent,
100
from bzrlib.osutils import (
107
from bzrlib.versionedfile import (
108
AbsentContentFactory,
112
FulltextContentFactory,
118
# TODO: Split out code specific to this format into an associated object.
120
# TODO: Can we put in some kind of value to check that the index and data
121
# files belong together?
123
# TODO: accommodate binaries, perhaps by storing a byte count
125
# TODO: function to check whole file
127
# TODO: atomically append data, then measure backwards from the cursor
128
# position after writing to work out where it was located. we may need to
129
# bypass python file buffering.
131
DATA_SUFFIX = '.knit'
132
INDEX_SUFFIX = '.kndx'
135
class KnitAdapter(object):
136
"""Base class for knit record adaption."""
138
def __init__(self, basis_vf):
139
"""Create an adapter which accesses full texts from basis_vf.
141
:param basis_vf: A versioned file to access basis texts of deltas from.
142
May be None for adapters that do not need to access basis texts.
144
self._data = KnitVersionedFiles(None, None)
145
self._annotate_factory = KnitAnnotateFactory()
146
self._plain_factory = KnitPlainFactory()
147
self._basis_vf = basis_vf
150
class FTAnnotatedToUnannotated(KnitAdapter):
151
"""An adapter from FT annotated knits to unannotated ones."""
153
def get_bytes(self, factory, annotated_compressed_bytes):
155
self._data._parse_record_unchecked(annotated_compressed_bytes)
156
content = self._annotate_factory.parse_fulltext(contents, rec[1])
157
size, bytes = self._data._record_to_data((rec[1],), rec[3], content.text())
161
class DeltaAnnotatedToUnannotated(KnitAdapter):
162
"""An adapter for deltas from annotated to unannotated."""
164
def get_bytes(self, factory, annotated_compressed_bytes):
166
self._data._parse_record_unchecked(annotated_compressed_bytes)
167
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
169
contents = self._plain_factory.lower_line_delta(delta)
170
size, bytes = self._data._record_to_data((rec[1],), rec[3], contents)
174
class FTAnnotatedToFullText(KnitAdapter):
175
"""An adapter from FT annotated knits to unannotated ones."""
177
def get_bytes(self, factory, annotated_compressed_bytes):
179
self._data._parse_record_unchecked(annotated_compressed_bytes)
180
content, delta = self._annotate_factory.parse_record(factory.key[-1],
181
contents, factory._build_details, None)
182
return ''.join(content.text())
185
class DeltaAnnotatedToFullText(KnitAdapter):
186
"""An adapter for deltas from annotated to unannotated."""
188
def get_bytes(self, factory, annotated_compressed_bytes):
190
self._data._parse_record_unchecked(annotated_compressed_bytes)
191
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
193
compression_parent = factory.parents[0]
194
basis_entry = self._basis_vf.get_record_stream(
195
[compression_parent], 'unordered', True).next()
196
if basis_entry.storage_kind == 'absent':
197
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
198
basis_lines = split_lines(basis_entry.get_bytes_as('fulltext'))
199
# Manually apply the delta because we have one annotated content and
201
basis_content = PlainKnitContent(basis_lines, compression_parent)
202
basis_content.apply_delta(delta, rec[1])
203
basis_content._should_strip_eol = factory._build_details[1]
204
return ''.join(basis_content.text())
207
class FTPlainToFullText(KnitAdapter):
208
"""An adapter from FT plain knits to unannotated ones."""
210
def get_bytes(self, factory, compressed_bytes):
212
self._data._parse_record_unchecked(compressed_bytes)
213
content, delta = self._plain_factory.parse_record(factory.key[-1],
214
contents, factory._build_details, None)
215
return ''.join(content.text())
218
class DeltaPlainToFullText(KnitAdapter):
219
"""An adapter for deltas from annotated to unannotated."""
221
def get_bytes(self, factory, compressed_bytes):
223
self._data._parse_record_unchecked(compressed_bytes)
224
delta = self._plain_factory.parse_line_delta(contents, rec[1])
225
compression_parent = factory.parents[0]
226
# XXX: string splitting overhead.
227
basis_entry = self._basis_vf.get_record_stream(
228
[compression_parent], 'unordered', True).next()
229
if basis_entry.storage_kind == 'absent':
230
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
231
basis_lines = split_lines(basis_entry.get_bytes_as('fulltext'))
232
basis_content = PlainKnitContent(basis_lines, compression_parent)
233
# Manually apply the delta because we have one annotated content and
235
content, _ = self._plain_factory.parse_record(rec[1], contents,
236
factory._build_details, basis_content)
237
return ''.join(content.text())
240
class KnitContentFactory(ContentFactory):
241
"""Content factory for streaming from knits.
243
:seealso ContentFactory:
246
def __init__(self, key, parents, build_details, sha1, raw_record,
247
annotated, knit=None):
248
"""Create a KnitContentFactory for key.
251
:param parents: The parents.
252
:param build_details: The build details as returned from
254
:param sha1: The sha1 expected from the full text of this object.
255
:param raw_record: The bytes of the knit data from disk.
256
:param annotated: True if the raw data is annotated.
258
ContentFactory.__init__(self)
261
self.parents = parents
262
if build_details[0] == 'line-delta':
267
annotated_kind = 'annotated-'
270
self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
271
self._raw_record = raw_record
272
self._build_details = build_details
275
def get_bytes_as(self, storage_kind):
276
if storage_kind == self.storage_kind:
277
return self._raw_record
278
if storage_kind == 'fulltext' and self._knit is not None:
279
return self._knit.get_text(self.key[0])
281
raise errors.UnavailableRepresentation(self.key, storage_kind,
285
class KnitContent(object):
286
"""Content of a knit version to which deltas can be applied.
288
This is always stored in memory as a list of lines with \n at the end,
289
plus a flag saying if the final ending is really there or not, because that
290
corresponds to the on-disk knit representation.
294
self._should_strip_eol = False
296
def apply_delta(self, delta, new_version_id):
297
"""Apply delta to this object to become new_version_id."""
298
raise NotImplementedError(self.apply_delta)
300
def line_delta_iter(self, new_lines):
301
"""Generate line-based delta from this content to new_lines."""
302
new_texts = new_lines.text()
303
old_texts = self.text()
304
s = patiencediff.PatienceSequenceMatcher(None, old_texts, new_texts)
305
for tag, i1, i2, j1, j2 in s.get_opcodes():
308
# ofrom, oto, length, data
309
yield i1, i2, j2 - j1, new_lines._lines[j1:j2]
311
def line_delta(self, new_lines):
312
return list(self.line_delta_iter(new_lines))
315
def get_line_delta_blocks(knit_delta, source, target):
316
"""Extract SequenceMatcher.get_matching_blocks() from a knit delta"""
317
target_len = len(target)
320
for s_begin, s_end, t_len, new_text in knit_delta:
321
true_n = s_begin - s_pos
324
# knit deltas do not provide reliable info about whether the
325
# last line of a file matches, due to eol handling.
326
if source[s_pos + n -1] != target[t_pos + n -1]:
329
yield s_pos, t_pos, n
330
t_pos += t_len + true_n
332
n = target_len - t_pos
334
if source[s_pos + n -1] != target[t_pos + n -1]:
337
yield s_pos, t_pos, n
338
yield s_pos + (target_len - t_pos), target_len, 0
341
class AnnotatedKnitContent(KnitContent):
342
"""Annotated content."""
344
def __init__(self, lines):
345
KnitContent.__init__(self)
349
"""Return a list of (origin, text) for each content line."""
350
lines = self._lines[:]
351
if self._should_strip_eol:
352
origin, last_line = lines[-1]
353
lines[-1] = (origin, last_line.rstrip('\n'))
356
def apply_delta(self, delta, new_version_id):
357
"""Apply delta to this object to become new_version_id."""
360
for start, end, count, delta_lines in delta:
361
lines[offset+start:offset+end] = delta_lines
362
offset = offset + (start - end) + count
366
lines = [text for origin, text in self._lines]
367
except ValueError, e:
368
# most commonly (only?) caused by the internal form of the knit
369
# missing annotation information because of a bug - see thread
371
raise KnitCorrupt(self,
372
"line in annotated knit missing annotation information: %s"
374
if self._should_strip_eol:
375
lines[-1] = lines[-1].rstrip('\n')
379
return AnnotatedKnitContent(self._lines[:])
382
class PlainKnitContent(KnitContent):
383
"""Unannotated content.
385
When annotate[_iter] is called on this content, the same version is reported
386
for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
390
def __init__(self, lines, version_id):
391
KnitContent.__init__(self)
393
self._version_id = version_id
396
"""Return a list of (origin, text) for each content line."""
397
return [(self._version_id, line) for line in self._lines]
399
def apply_delta(self, delta, new_version_id):
400
"""Apply delta to this object to become new_version_id."""
403
for start, end, count, delta_lines in delta:
404
lines[offset+start:offset+end] = delta_lines
405
offset = offset + (start - end) + count
406
self._version_id = new_version_id
409
return PlainKnitContent(self._lines[:], self._version_id)
413
if self._should_strip_eol:
415
lines[-1] = lines[-1].rstrip('\n')
419
class _KnitFactory(object):
420
"""Base class for common Factory functions."""
422
def parse_record(self, version_id, record, record_details,
423
base_content, copy_base_content=True):
424
"""Parse a record into a full content object.
426
:param version_id: The official version id for this content
427
:param record: The data returned by read_records_iter()
428
:param record_details: Details about the record returned by
430
:param base_content: If get_build_details returns a compression_parent,
431
you must return a base_content here, else use None
432
:param copy_base_content: When building from the base_content, decide
433
you can either copy it and return a new object, or modify it in
435
:return: (content, delta) A Content object and possibly a line-delta,
438
method, noeol = record_details
439
if method == 'line-delta':
440
if copy_base_content:
441
content = base_content.copy()
443
content = base_content
444
delta = self.parse_line_delta(record, version_id)
445
content.apply_delta(delta, version_id)
447
content = self.parse_fulltext(record, version_id)
449
content._should_strip_eol = noeol
450
return (content, delta)
453
class KnitAnnotateFactory(_KnitFactory):
454
"""Factory for creating annotated Content objects."""
458
def make(self, lines, version_id):
459
num_lines = len(lines)
460
return AnnotatedKnitContent(zip([version_id] * num_lines, lines))
462
def parse_fulltext(self, content, version_id):
463
"""Convert fulltext to internal representation
465
fulltext content is of the format
466
revid(utf8) plaintext\n
467
internal representation is of the format:
470
# TODO: jam 20070209 The tests expect this to be returned as tuples,
471
# but the code itself doesn't really depend on that.
472
# Figure out a way to not require the overhead of turning the
473
# list back into tuples.
474
lines = [tuple(line.split(' ', 1)) for line in content]
475
return AnnotatedKnitContent(lines)
477
def parse_line_delta_iter(self, lines):
478
return iter(self.parse_line_delta(lines))
480
def parse_line_delta(self, lines, version_id, plain=False):
481
"""Convert a line based delta into internal representation.
483
line delta is in the form of:
484
intstart intend intcount
486
revid(utf8) newline\n
487
internal representation is
488
(start, end, count, [1..count tuples (revid, newline)])
490
:param plain: If True, the lines are returned as a plain
491
list without annotations, not as a list of (origin, content) tuples, i.e.
492
(start, end, count, [1..count newline])
499
def cache_and_return(line):
500
origin, text = line.split(' ', 1)
501
return cache.setdefault(origin, origin), text
503
# walk through the lines parsing.
504
# Note that the plain test is explicitly pulled out of the
505
# loop to minimise any performance impact
508
start, end, count = [int(n) for n in header.split(',')]
509
contents = [next().split(' ', 1)[1] for i in xrange(count)]
510
result.append((start, end, count, contents))
513
start, end, count = [int(n) for n in header.split(',')]
514
contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
515
result.append((start, end, count, contents))
518
def get_fulltext_content(self, lines):
519
"""Extract just the content lines from a fulltext."""
520
return (line.split(' ', 1)[1] for line in lines)
522
def get_linedelta_content(self, lines):
523
"""Extract just the content from a line delta.
525
This doesn't return all of the extra information stored in a delta.
526
Only the actual content lines.
531
header = header.split(',')
532
count = int(header[2])
533
for i in xrange(count):
534
origin, text = next().split(' ', 1)
537
def lower_fulltext(self, content):
538
"""convert a fulltext content record into a serializable form.
540
see parse_fulltext which this inverts.
542
# TODO: jam 20070209 We only do the caching thing to make sure that
543
# the origin is a valid utf-8 line, eventually we could remove it
544
return ['%s %s' % (o, t) for o, t in content._lines]
546
def lower_line_delta(self, delta):
547
"""convert a delta into a serializable form.
549
See parse_line_delta which this inverts.
551
# TODO: jam 20070209 We only do the caching thing to make sure that
552
# the origin is a valid utf-8 line, eventually we could remove it
554
for start, end, c, lines in delta:
555
out.append('%d,%d,%d\n' % (start, end, c))
556
out.extend(origin + ' ' + text
557
for origin, text in lines)
560
def annotate(self, knit, key):
561
content = knit._get_content(key)
562
# adjust for the fact that serialised annotations are only key suffixes
564
if type(key) == tuple:
566
origins = content.annotate()
568
for origin, line in origins:
569
result.append((prefix + (origin,), line))
572
# XXX: This smells a bit. Why would key ever be a non-tuple here?
573
# Aren't keys defined to be tuples? -- spiv 20080618
574
return content.annotate()
577
class KnitPlainFactory(_KnitFactory):
578
"""Factory for creating plain Content objects."""
582
def make(self, lines, version_id):
583
return PlainKnitContent(lines, version_id)
585
def parse_fulltext(self, content, version_id):
586
"""This parses an unannotated fulltext.
588
Note that this is not a noop - the internal representation
589
has (versionid, line) - its just a constant versionid.
591
return self.make(content, version_id)
593
def parse_line_delta_iter(self, lines, version_id):
595
num_lines = len(lines)
596
while cur < num_lines:
599
start, end, c = [int(n) for n in header.split(',')]
600
yield start, end, c, lines[cur:cur+c]
603
def parse_line_delta(self, lines, version_id):
604
return list(self.parse_line_delta_iter(lines, version_id))
606
def get_fulltext_content(self, lines):
607
"""Extract just the content lines from a fulltext."""
610
def get_linedelta_content(self, lines):
611
"""Extract just the content from a line delta.
613
This doesn't return all of the extra information stored in a delta.
614
Only the actual content lines.
619
header = header.split(',')
620
count = int(header[2])
621
for i in xrange(count):
624
def lower_fulltext(self, content):
625
return content.text()
627
def lower_line_delta(self, delta):
629
for start, end, c, lines in delta:
630
out.append('%d,%d,%d\n' % (start, end, c))
634
def annotate(self, knit, key):
635
annotator = _KnitAnnotator(knit)
636
return annotator.annotate(key)
640
def make_file_factory(annotated, mapper):
641
"""Create a factory for creating a file based KnitVersionedFiles.
643
This is only functional enough to run interface tests, it doesn't try to
644
provide a full pack environment.
646
:param annotated: knit annotations are wanted.
647
:param mapper: The mapper from keys to paths.
649
def factory(transport):
650
index = _KndxIndex(transport, mapper, lambda:None, lambda:True, lambda:True)
651
access = _KnitKeyAccess(transport, mapper)
652
return KnitVersionedFiles(index, access, annotated=annotated)
656
def make_pack_factory(graph, delta, keylength):
657
"""Create a factory for creating a pack based VersionedFiles.
659
This is only functional enough to run interface tests, it doesn't try to
660
provide a full pack environment.
662
:param graph: Store a graph.
663
:param delta: Delta compress contents.
664
:param keylength: How long should keys be.
666
def factory(transport):
667
parents = graph or delta
673
max_delta_chain = 200
676
graph_index = _mod_index.InMemoryGraphIndex(reference_lists=ref_length,
677
key_elements=keylength)
678
stream = transport.open_write_stream('newpack')
679
writer = pack.ContainerWriter(stream.write)
681
index = _KnitGraphIndex(graph_index, lambda:True, parents=parents,
682
deltas=delta, add_callback=graph_index.add_nodes)
683
access = _DirectPackAccess({})
684
access.set_writer(writer, graph_index, (transport, 'newpack'))
685
result = KnitVersionedFiles(index, access,
686
max_delta_chain=max_delta_chain)
687
result.stream = stream
688
result.writer = writer
693
def cleanup_pack_knit(versioned_files):
694
versioned_files.stream.close()
695
versioned_files.writer.end()
698
class KnitVersionedFiles(VersionedFiles):
699
"""Storage for many versioned files using knit compression.
701
Backend storage is managed by indices and data objects.
703
:ivar _index: A _KnitGraphIndex or similar that can describe the
704
parents, graph, compression and data location of entries in this
705
KnitVersionedFiles. Note that this is only the index for
706
*this* vfs; if there are fallbacks they must be queried separately.
709
def __init__(self, index, data_access, max_delta_chain=200,
711
"""Create a KnitVersionedFiles with index and data_access.
713
:param index: The index for the knit data.
714
:param data_access: The access object to store and retrieve knit
716
:param max_delta_chain: The maximum number of deltas to permit during
717
insertion. Set to 0 to prohibit the use of deltas.
718
:param annotated: Set to True to cause annotations to be calculated and
719
stored during insertion.
722
self._access = data_access
723
self._max_delta_chain = max_delta_chain
725
self._factory = KnitAnnotateFactory()
727
self._factory = KnitPlainFactory()
728
self._fallback_vfs = []
731
return "%s(%r, %r)" % (
732
self.__class__.__name__,
736
def add_fallback_versioned_files(self, a_versioned_files):
737
"""Add a source of texts for texts not present in this knit.
739
:param a_versioned_files: A VersionedFiles object.
741
self._fallback_vfs.append(a_versioned_files)
743
def add_lines(self, key, parents, lines, parent_texts=None,
744
left_matching_blocks=None, nostore_sha=None, random_id=False,
746
"""See VersionedFiles.add_lines()."""
747
self._index._check_write_ok()
748
self._check_add(key, lines, random_id, check_content)
750
# The caller might pass None if there is no graph data, but kndx
751
# indexes can't directly store that, so we give them
752
# an empty tuple instead.
754
return self._add(key, lines, parents,
755
parent_texts, left_matching_blocks, nostore_sha, random_id)
757
def _add(self, key, lines, parents, parent_texts,
758
left_matching_blocks, nostore_sha, random_id):
759
"""Add a set of lines on top of version specified by parents.
761
Any versions not present will be converted into ghosts.
763
# first thing, if the content is something we don't need to store, find
765
line_bytes = ''.join(lines)
766
digest = sha_string(line_bytes)
767
if nostore_sha == digest:
768
raise errors.ExistingContent
771
if parent_texts is None:
773
# Do a single query to ascertain parent presence.
774
present_parent_map = self.get_parent_map(parents)
775
for parent in parents:
776
if parent in present_parent_map:
777
present_parents.append(parent)
779
# Currently we can only compress against the left most present parent.
780
if (len(present_parents) == 0 or
781
present_parents[0] != parents[0]):
784
# To speed the extract of texts the delta chain is limited
785
# to a fixed number of deltas. This should minimize both
786
# I/O and the time spend applying deltas.
787
delta = self._check_should_delta(present_parents[0])
789
text_length = len(line_bytes)
792
if lines[-1][-1] != '\n':
793
# copy the contents of lines.
795
options.append('no-eol')
796
lines[-1] = lines[-1] + '\n'
800
if type(element) != str:
801
raise TypeError("key contains non-strings: %r" % (key,))
802
# Knit hunks are still last-element only
804
content = self._factory.make(lines, version_id)
805
if 'no-eol' in options:
806
# Hint to the content object that its text() call should strip the
808
content._should_strip_eol = True
809
if delta or (self._factory.annotated and len(present_parents) > 0):
810
# Merge annotations from parent texts if needed.
811
delta_hunks = self._merge_annotations(content, present_parents,
812
parent_texts, delta, self._factory.annotated,
813
left_matching_blocks)
816
options.append('line-delta')
817
store_lines = self._factory.lower_line_delta(delta_hunks)
818
size, bytes = self._record_to_data(key, digest,
821
options.append('fulltext')
822
# isinstance is slower and we have no hierarchy.
823
if self._factory.__class__ == KnitPlainFactory:
824
# Use the already joined bytes saving iteration time in
826
size, bytes = self._record_to_data(key, digest,
829
# get mixed annotation + content and feed it into the
831
store_lines = self._factory.lower_fulltext(content)
832
size, bytes = self._record_to_data(key, digest,
835
access_memo = self._access.add_raw_records([(key, size)], bytes)[0]
836
self._index.add_records(
837
((key, options, access_memo, parents),),
839
return digest, text_length, content
841
def annotate(self, key):
842
"""See VersionedFiles.annotate."""
843
return self._factory.annotate(self, key)
845
def check(self, progress_bar=None):
846
"""See VersionedFiles.check()."""
847
# This doesn't actually test extraction of everything, but that will
848
# impact 'bzr check' substantially, and needs to be integrated with
849
# care. However, it does check for the obvious problem of a delta with
851
keys = self._index.keys()
852
parent_map = self.get_parent_map(keys)
854
if self._index.get_method(key) != 'fulltext':
855
compression_parent = parent_map[key][0]
856
if compression_parent not in parent_map:
857
raise errors.KnitCorrupt(self,
858
"Missing basis parent %s for %s" % (
859
compression_parent, key))
860
for fallback_vfs in self._fallback_vfs:
863
def _check_add(self, key, lines, random_id, check_content):
864
"""check that version_id and lines are safe to add."""
866
if contains_whitespace(version_id):
867
raise InvalidRevisionId(version_id, self)
868
self.check_not_reserved_id(version_id)
869
# TODO: If random_id==False and the key is already present, we should
870
# probably check that the existing content is identical to what is
871
# being inserted, and otherwise raise an exception. This would make
872
# the bundle code simpler.
874
self._check_lines_not_unicode(lines)
875
self._check_lines_are_lines(lines)
877
def _check_header(self, key, line):
878
rec = self._split_header(line)
879
self._check_header_version(rec, key[-1])
882
def _check_header_version(self, rec, version_id):
883
"""Checks the header version on original format knit records.
885
These have the last component of the key embedded in the record.
887
if rec[1] != version_id:
888
raise KnitCorrupt(self,
889
'unexpected version, wanted %r, got %r' % (version_id, rec[1]))
891
def _check_should_delta(self, parent):
892
"""Iterate back through the parent listing, looking for a fulltext.
894
This is used when we want to decide whether to add a delta or a new
895
fulltext. It searches for _max_delta_chain parents. When it finds a
896
fulltext parent, it sees if the total size of the deltas leading up to
897
it is large enough to indicate that we want a new full text anyway.
899
Return True if we should create a new delta, False if we should use a
904
for count in xrange(self._max_delta_chain):
905
# XXX: Collapse these two queries:
907
# Note that this only looks in the index of this particular
908
# KnitVersionedFiles, not in the fallbacks. This ensures that
909
# we won't store a delta spanning physical repository
911
method = self._index.get_method(parent)
912
except RevisionNotPresent:
913
# Some basis is not locally present: always delta
915
index, pos, size = self._index.get_position(parent)
916
if method == 'fulltext':
920
# We don't explicitly check for presence because this is in an
921
# inner loop, and if it's missing it'll fail anyhow.
922
# TODO: This should be asking for compression parent, not graph
924
parent = self._index.get_parent_map([parent])[parent][0]
926
# We couldn't find a fulltext, so we must create a new one
928
# Simple heuristic - if the total I/O wold be greater as a delta than
929
# the originally installed fulltext, we create a new fulltext.
930
return fulltext_size > delta_size
932
def _build_details_to_components(self, build_details):
933
"""Convert a build_details tuple to a position tuple."""
934
# record_details, access_memo, compression_parent
935
return build_details[3], build_details[0], build_details[1]
937
def _get_components_positions(self, keys, allow_missing=False):
938
"""Produce a map of position data for the components of keys.
940
This data is intended to be used for retrieving the knit records.
942
A dict of key to (record_details, index_memo, next, parents) is
944
method is the way referenced data should be applied.
945
index_memo is the handle to pass to the data access to actually get the
947
next is the build-parent of the version, or None for fulltexts.
948
parents is the version_ids of the parents of this version
950
:param allow_missing: If True do not raise an error on a missing component,
954
pending_components = keys
955
while pending_components:
956
build_details = self._index.get_build_details(pending_components)
957
current_components = set(pending_components)
958
pending_components = set()
959
for key, details in build_details.iteritems():
960
(index_memo, compression_parent, parents,
961
record_details) = details
962
method = record_details[0]
963
if compression_parent is not None:
964
pending_components.add(compression_parent)
965
component_data[key] = self._build_details_to_components(details)
966
missing = current_components.difference(build_details)
967
if missing and not allow_missing:
968
raise errors.RevisionNotPresent(missing.pop(), self)
969
return component_data
971
def _get_content(self, key, parent_texts={}):
972
"""Returns a content object that makes up the specified
974
cached_version = parent_texts.get(key, None)
975
if cached_version is not None:
976
# Ensure the cache dict is valid.
977
if not self.get_parent_map([key]):
978
raise RevisionNotPresent(key, self)
979
return cached_version
980
text_map, contents_map = self._get_content_maps([key])
981
return contents_map[key]
983
def _get_content_maps(self, keys, nonlocal_keys=None):
984
"""Produce maps of text and KnitContents
986
:param keys: The keys to produce content maps for.
987
:param nonlocal_keys: An iterable of keys(possibly intersecting keys)
988
which are known to not be in this knit, but rather in one of the
990
:return: (text_map, content_map) where text_map contains the texts for
991
the requested versions and content_map contains the KnitContents.
993
# FUTURE: This function could be improved for the 'extract many' case
994
# by tracking each component and only doing the copy when the number of
995
# children than need to apply delta's to it is > 1 or it is part of the
998
multiple_versions = len(keys) != 1
999
record_map = self._get_record_map(keys, allow_missing=True)
1004
if nonlocal_keys is None:
1005
nonlocal_keys = set()
1007
nonlocal_keys = frozenset(nonlocal_keys)
1008
missing_keys = set(nonlocal_keys)
1009
for source in self._fallback_vfs:
1010
if not missing_keys:
1012
for record in source.get_record_stream(missing_keys,
1014
if record.storage_kind == 'absent':
1016
missing_keys.remove(record.key)
1017
lines = split_lines(record.get_bytes_as('fulltext'))
1018
text_map[record.key] = lines
1019
content_map[record.key] = PlainKnitContent(lines, record.key)
1020
if record.key in keys:
1021
final_content[record.key] = content_map[record.key]
1023
if key in nonlocal_keys:
1028
while cursor is not None:
1030
record, record_details, digest, next = record_map[cursor]
1032
raise RevisionNotPresent(cursor, self)
1033
components.append((cursor, record, record_details, digest))
1035
if cursor in content_map:
1036
# no need to plan further back
1037
components.append((cursor, None, None, None))
1041
for (component_id, record, record_details,
1042
digest) in reversed(components):
1043
if component_id in content_map:
1044
content = content_map[component_id]
1046
content, delta = self._factory.parse_record(key[-1],
1047
record, record_details, content,
1048
copy_base_content=multiple_versions)
1049
if multiple_versions:
1050
content_map[component_id] = content
1052
final_content[key] = content
1054
# digest here is the digest from the last applied component.
1055
text = content.text()
1056
actual_sha = sha_strings(text)
1057
if actual_sha != digest:
1058
raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
1059
text_map[key] = text
1060
return text_map, final_content
1062
def get_parent_map(self, keys):
1063
"""Get a map of the graph parents of keys.
1065
:param keys: The keys to look up parents for.
1066
:return: A mapping from keys to parents. Absent keys are absent from
1069
return self._get_parent_map_with_sources(keys)[0]
1071
def _get_parent_map_with_sources(self, keys):
1072
"""Get a map of the parents of keys.
1074
:param keys: The keys to look up parents for.
1075
:return: A tuple. The first element is a mapping from keys to parents.
1076
Absent keys are absent from the mapping. The second element is a
1077
list with the locations each key was found in. The first element
1078
is the in-this-knit parents, the second the first fallback source,
1082
sources = [self._index] + self._fallback_vfs
1085
for source in sources:
1088
new_result = source.get_parent_map(missing)
1089
source_results.append(new_result)
1090
result.update(new_result)
1091
missing.difference_update(set(new_result))
1092
return result, source_results
1094
def _get_record_map(self, keys, allow_missing=False):
1095
"""Produce a dictionary of knit records.
1097
:return: {key:(record, record_details, digest, next)}
1099
data returned from read_records
1101
opaque information to pass to parse_record
1103
SHA1 digest of the full text after all steps are done
1105
build-parent of the version, i.e. the leftmost ancestor.
1106
Will be None if the record is not a delta.
1107
:param keys: The keys to build a map for
1108
:param allow_missing: If some records are missing, rather than
1109
error, just return the data that could be generated.
1111
position_map = self._get_components_positions(keys,
1112
allow_missing=allow_missing)
1113
# key = component_id, r = record_details, i_m = index_memo, n = next
1114
records = [(key, i_m) for key, (r, i_m, n)
1115
in position_map.iteritems()]
1117
for key, record, digest in \
1118
self._read_records_iter(records):
1119
(record_details, index_memo, next) = position_map[key]
1120
record_map[key] = record, record_details, digest, next
1123
def _split_by_prefix(self, keys):
1124
"""For the given keys, split them up based on their prefix.
1126
To keep memory pressure somewhat under control, split the
1127
requests back into per-file-id requests, otherwise "bzr co"
1128
extracts the full tree into memory before writing it to disk.
1129
This should be revisited if _get_content_maps() can ever cross
1132
:param keys: An iterable of key tuples
1133
:return: A dict of {prefix: [key_list]}
1135
split_by_prefix = {}
1138
split_by_prefix.setdefault('', []).append(key)
1140
split_by_prefix.setdefault(key[0], []).append(key)
1141
return split_by_prefix
1143
def get_record_stream(self, keys, ordering, include_delta_closure):
1144
"""Get a stream of records for keys.
1146
:param keys: The keys to include.
1147
:param ordering: Either 'unordered' or 'topological'. A topologically
1148
sorted stream has compression parents strictly before their
1150
:param include_delta_closure: If True then the closure across any
1151
compression parents will be included (in the opaque data).
1152
:return: An iterator of ContentFactory objects, each of which is only
1153
valid until the iterator is advanced.
1155
# keys might be a generator
1159
if not self._index.has_graph:
1160
# Cannot topological order when no graph has been stored.
1161
ordering = 'unordered'
1162
if include_delta_closure:
1163
positions = self._get_components_positions(keys, allow_missing=True)
1165
build_details = self._index.get_build_details(keys)
1167
# (record_details, access_memo, compression_parent_key)
1168
positions = dict((key, self._build_details_to_components(details))
1169
for key, details in build_details.iteritems())
1170
absent_keys = keys.difference(set(positions))
1171
# There may be more absent keys : if we're missing the basis component
1172
# and are trying to include the delta closure.
1173
if include_delta_closure:
1174
needed_from_fallback = set()
1175
# Build up reconstructable_keys dict. key:True in this dict means
1176
# the key can be reconstructed.
1177
reconstructable_keys = {}
1181
chain = [key, positions[key][2]]
1183
needed_from_fallback.add(key)
1186
while chain[-1] is not None:
1187
if chain[-1] in reconstructable_keys:
1188
result = reconstructable_keys[chain[-1]]
1192
chain.append(positions[chain[-1]][2])
1194
# missing basis component
1195
needed_from_fallback.add(chain[-1])
1198
for chain_key in chain[:-1]:
1199
reconstructable_keys[chain_key] = result
1201
needed_from_fallback.add(key)
1202
# Double index lookups here : need a unified api ?
1203
global_map, parent_maps = self._get_parent_map_with_sources(keys)
1204
if ordering == 'topological':
1205
# Global topological sort
1206
present_keys = tsort.topo_sort(global_map)
1207
# Now group by source:
1209
current_source = None
1210
for key in present_keys:
1211
for parent_map in parent_maps:
1212
if key in parent_map:
1213
key_source = parent_map
1215
if current_source is not key_source:
1216
source_keys.append((key_source, []))
1217
current_source = key_source
1218
source_keys[-1][1].append(key)
1220
if ordering != 'unordered':
1221
raise AssertionError('valid values for ordering are:'
1222
' "unordered" or "topological" not: %r'
1224
# Just group by source; remote sources first.
1227
for parent_map in reversed(parent_maps):
1228
source_keys.append((parent_map, []))
1229
for key in parent_map:
1230
present_keys.append(key)
1231
source_keys[-1][1].append(key)
1232
absent_keys = keys - set(global_map)
1233
for key in absent_keys:
1234
yield AbsentContentFactory(key)
1235
# restrict our view to the keys we can answer.
1236
# XXX: Memory: TODO: batch data here to cap buffered data at (say) 1MB.
1237
# XXX: At that point we need to consider the impact of double reads by
1238
# utilising components multiple times.
1239
if include_delta_closure:
1240
# XXX: get_content_maps performs its own index queries; allow state
1242
non_local_keys = needed_from_fallback - absent_keys
1243
prefix_split_keys = self._split_by_prefix(present_keys)
1244
prefix_split_non_local_keys = self._split_by_prefix(non_local_keys)
1245
for prefix, keys in prefix_split_keys.iteritems():
1246
non_local = prefix_split_non_local_keys.get(prefix, [])
1247
non_local = set(non_local)
1248
text_map, _ = self._get_content_maps(keys, non_local)
1250
lines = text_map.pop(key)
1251
text = ''.join(lines)
1252
yield FulltextContentFactory(key, global_map[key], None,
1255
for source, keys in source_keys:
1256
if source is parent_maps[0]:
1257
# this KnitVersionedFiles
1258
records = [(key, positions[key][1]) for key in keys]
1259
for key, raw_data, sha1 in self._read_records_iter_raw(records):
1260
(record_details, index_memo, _) = positions[key]
1261
yield KnitContentFactory(key, global_map[key],
1262
record_details, sha1, raw_data, self._factory.annotated, None)
1264
vf = self._fallback_vfs[parent_maps.index(source) - 1]
1265
for record in vf.get_record_stream(keys, ordering,
1266
include_delta_closure):
1269
def get_sha1s(self, keys):
1270
"""See VersionedFiles.get_sha1s()."""
1272
record_map = self._get_record_map(missing, allow_missing=True)
1274
for key, details in record_map.iteritems():
1275
if key not in missing:
1277
# record entry 2 is the 'digest'.
1278
result[key] = details[2]
1279
missing.difference_update(set(result))
1280
for source in self._fallback_vfs:
1283
new_result = source.get_sha1s(missing)
1284
result.update(new_result)
1285
missing.difference_update(set(new_result))
1288
def insert_record_stream(self, stream):
1289
"""Insert a record stream into this container.
1291
:param stream: A stream of records to insert.
1293
:seealso VersionedFiles.get_record_stream:
1295
def get_adapter(adapter_key):
1297
return adapters[adapter_key]
1299
adapter_factory = adapter_registry.get(adapter_key)
1300
adapter = adapter_factory(self)
1301
adapters[adapter_key] = adapter
1303
if self._factory.annotated:
1304
# self is annotated, we need annotated knits to use directly.
1305
annotated = "annotated-"
1308
# self is not annotated, but we can strip annotations cheaply.
1310
convertibles = set(["knit-annotated-ft-gz"])
1311
if self._max_delta_chain:
1312
convertibles.add("knit-annotated-delta-gz")
1313
# The set of types we can cheaply adapt without needing basis texts.
1314
native_types = set()
1315
if self._max_delta_chain:
1316
native_types.add("knit-%sdelta-gz" % annotated)
1317
native_types.add("knit-%sft-gz" % annotated)
1318
knit_types = native_types.union(convertibles)
1320
# Buffer all index entries that we can't add immediately because their
1321
# basis parent is missing. We don't buffer all because generating
1322
# annotations may require access to some of the new records. However we
1323
# can't generate annotations from new deltas until their basis parent
1324
# is present anyway, so we get away with not needing an index that
1325
# includes the new keys.
1326
# key = basis_parent, value = index entry to add
1327
buffered_index_entries = {}
1328
for record in stream:
1329
parents = record.parents
1330
# Raise an error when a record is missing.
1331
if record.storage_kind == 'absent':
1332
raise RevisionNotPresent([record.key], self)
1333
if record.storage_kind in knit_types:
1334
if record.storage_kind not in native_types:
1336
adapter_key = (record.storage_kind, "knit-delta-gz")
1337
adapter = get_adapter(adapter_key)
1339
adapter_key = (record.storage_kind, "knit-ft-gz")
1340
adapter = get_adapter(adapter_key)
1341
bytes = adapter.get_bytes(
1342
record, record.get_bytes_as(record.storage_kind))
1344
bytes = record.get_bytes_as(record.storage_kind)
1345
options = [record._build_details[0]]
1346
if record._build_details[1]:
1347
options.append('no-eol')
1348
# Just blat it across.
1349
# Note: This does end up adding data on duplicate keys. As
1350
# modern repositories use atomic insertions this should not
1351
# lead to excessive growth in the event of interrupted fetches.
1352
# 'knit' repositories may suffer excessive growth, but as a
1353
# deprecated format this is tolerable. It can be fixed if
1354
# needed by in the kndx index support raising on a duplicate
1355
# add with identical parents and options.
1356
access_memo = self._access.add_raw_records(
1357
[(record.key, len(bytes))], bytes)[0]
1358
index_entry = (record.key, options, access_memo, parents)
1360
if 'fulltext' not in options:
1361
basis_parent = parents[0]
1362
# Note that pack backed knits don't need to buffer here
1363
# because they buffer all writes to the transaction level,
1364
# but we don't expose that difference at the index level. If
1365
# the query here has sufficient cost to show up in
1366
# profiling we should do that.
1367
if basis_parent not in self.get_parent_map([basis_parent]):
1368
pending = buffered_index_entries.setdefault(
1370
pending.append(index_entry)
1373
self._index.add_records([index_entry])
1374
elif record.storage_kind == 'fulltext':
1375
self.add_lines(record.key, parents,
1376
split_lines(record.get_bytes_as('fulltext')))
1378
adapter_key = record.storage_kind, 'fulltext'
1379
adapter = get_adapter(adapter_key)
1380
lines = split_lines(adapter.get_bytes(
1381
record, record.get_bytes_as(record.storage_kind)))
1383
self.add_lines(record.key, parents, lines)
1384
except errors.RevisionAlreadyPresent:
1386
# Add any records whose basis parent is now available.
1387
added_keys = [record.key]
1389
key = added_keys.pop(0)
1390
if key in buffered_index_entries:
1391
index_entries = buffered_index_entries[key]
1392
self._index.add_records(index_entries)
1394
[index_entry[0] for index_entry in index_entries])
1395
del buffered_index_entries[key]
1396
# If there were any deltas which had a missing basis parent, error.
1397
if buffered_index_entries:
1398
raise errors.RevisionNotPresent(buffered_index_entries.keys()[0],
1401
def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1402
"""Iterate over the lines in the versioned files from keys.
1404
This may return lines from other keys. Each item the returned
1405
iterator yields is a tuple of a line and a text version that that line
1406
is present in (not introduced in).
1408
Ordering of results is in whatever order is most suitable for the
1409
underlying storage format.
1411
If a progress bar is supplied, it may be used to indicate progress.
1412
The caller is responsible for cleaning up progress bars (because this
1416
* Lines are normalised by the underlying store: they will all have \n
1418
* Lines are returned in arbitrary order.
1420
:return: An iterator over (line, key).
1423
pb = progress.DummyProgress()
1426
# we don't care about inclusions, the caller cares.
1427
# but we need to setup a list of records to visit.
1428
# we need key, position, length
1430
build_details = self._index.get_build_details(keys)
1431
for key, details in build_details.iteritems():
1433
key_records.append((key, details[0]))
1435
records_iter = enumerate(self._read_records_iter(key_records))
1436
for (key_idx, (key, data, sha_value)) in records_iter:
1437
pb.update('Walking content.', key_idx, total)
1438
compression_parent = build_details[key][1]
1439
if compression_parent is None:
1441
line_iterator = self._factory.get_fulltext_content(data)
1444
line_iterator = self._factory.get_linedelta_content(data)
1445
# XXX: It might be more efficient to yield (key,
1446
# line_iterator) in the future. However for now, this is a simpler
1447
# change to integrate into the rest of the codebase. RBC 20071110
1448
for line in line_iterator:
1450
for source in self._fallback_vfs:
1454
for line, key in source.iter_lines_added_or_present_in_keys(keys):
1455
source_keys.add(key)
1457
keys.difference_update(source_keys)
1459
# XXX: strictly the second parameter is meant to be the file id
1460
# but it's not easily accessible here.
1461
raise RevisionNotPresent(keys, repr(self))
1462
pb.update('Walking content.', total, total)
1464
def _make_line_delta(self, delta_seq, new_content):
1465
"""Generate a line delta from delta_seq and new_content."""
1467
for op in delta_seq.get_opcodes():
1468
if op[0] == 'equal':
1470
diff_hunks.append((op[1], op[2], op[4]-op[3], new_content._lines[op[3]:op[4]]))
1473
def _merge_annotations(self, content, parents, parent_texts={},
1474
delta=None, annotated=None,
1475
left_matching_blocks=None):
1476
"""Merge annotations for content and generate deltas.
1478
This is done by comparing the annotations based on changes to the text
1479
and generating a delta on the resulting full texts. If annotations are
1480
not being created then a simple delta is created.
1482
if left_matching_blocks is not None:
1483
delta_seq = diff._PrematchedMatcher(left_matching_blocks)
1487
for parent_key in parents:
1488
merge_content = self._get_content(parent_key, parent_texts)
1489
if (parent_key == parents[0] and delta_seq is not None):
1492
seq = patiencediff.PatienceSequenceMatcher(
1493
None, merge_content.text(), content.text())
1494
for i, j, n in seq.get_matching_blocks():
1497
# this copies (origin, text) pairs across to the new
1498
# content for any line that matches the last-checked
1500
content._lines[j:j+n] = merge_content._lines[i:i+n]
1501
# XXX: Robert says the following block is a workaround for a
1502
# now-fixed bug and it can probably be deleted. -- mbp 20080618
1503
if content._lines and content._lines[-1][1][-1] != '\n':
1504
# The copied annotation was from a line without a trailing EOL,
1505
# reinstate one for the content object, to ensure correct
1507
line = content._lines[-1][1] + '\n'
1508
content._lines[-1] = (content._lines[-1][0], line)
1510
if delta_seq is None:
1511
reference_content = self._get_content(parents[0], parent_texts)
1512
new_texts = content.text()
1513
old_texts = reference_content.text()
1514
delta_seq = patiencediff.PatienceSequenceMatcher(
1515
None, old_texts, new_texts)
1516
return self._make_line_delta(delta_seq, content)
1518
def _parse_record(self, version_id, data):
1519
"""Parse an original format knit record.
1521
These have the last element of the key only present in the stored data.
1523
rec, record_contents = self._parse_record_unchecked(data)
1524
self._check_header_version(rec, version_id)
1525
return record_contents, rec[3]
1527
def _parse_record_header(self, key, raw_data):
1528
"""Parse a record header for consistency.
1530
:return: the header and the decompressor stream.
1531
as (stream, header_record)
1533
df = tuned_gzip.GzipFile(mode='rb', fileobj=StringIO(raw_data))
1536
rec = self._check_header(key, df.readline())
1537
except Exception, e:
1538
raise KnitCorrupt(self,
1539
"While reading {%s} got %s(%s)"
1540
% (key, e.__class__.__name__, str(e)))
1543
def _parse_record_unchecked(self, data):
1545
# 4168 calls in 2880 217 internal
1546
# 4168 calls to _parse_record_header in 2121
1547
# 4168 calls to readlines in 330
1548
df = tuned_gzip.GzipFile(mode='rb', fileobj=StringIO(data))
1550
record_contents = df.readlines()
1551
except Exception, e:
1552
raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
1553
(data, e.__class__.__name__, str(e)))
1554
header = record_contents.pop(0)
1555
rec = self._split_header(header)
1556
last_line = record_contents.pop()
1557
if len(record_contents) != int(rec[2]):
1558
raise KnitCorrupt(self,
1559
'incorrect number of lines %s != %s'
1560
' for version {%s} %s'
1561
% (len(record_contents), int(rec[2]),
1562
rec[1], record_contents))
1563
if last_line != 'end %s\n' % rec[1]:
1564
raise KnitCorrupt(self,
1565
'unexpected version end line %r, wanted %r'
1566
% (last_line, rec[1]))
1568
return rec, record_contents
1570
def _read_records_iter(self, records):
1571
"""Read text records from data file and yield result.
1573
The result will be returned in whatever is the fastest to read.
1574
Not by the order requested. Also, multiple requests for the same
1575
record will only yield 1 response.
1576
:param records: A list of (key, access_memo) entries
1577
:return: Yields (key, contents, digest) in the order
1578
read, not the order requested
1583
# XXX: This smells wrong, IO may not be getting ordered right.
1584
needed_records = sorted(set(records), key=operator.itemgetter(1))
1585
if not needed_records:
1588
# The transport optimizes the fetching as well
1589
# (ie, reads continuous ranges.)
1590
raw_data = self._access.get_raw_records(
1591
[index_memo for key, index_memo in needed_records])
1593
for (key, index_memo), data in \
1594
izip(iter(needed_records), raw_data):
1595
content, digest = self._parse_record(key[-1], data)
1596
yield key, content, digest
1598
def _read_records_iter_raw(self, records):
1599
"""Read text records from data file and yield raw data.
1601
This unpacks enough of the text record to validate the id is
1602
as expected but thats all.
1604
Each item the iterator yields is (key, bytes, sha1_of_full_text).
1606
# setup an iterator of the external records:
1607
# uses readv so nice and fast we hope.
1609
# grab the disk data needed.
1610
needed_offsets = [index_memo for key, index_memo
1612
raw_records = self._access.get_raw_records(needed_offsets)
1614
for key, index_memo in records:
1615
data = raw_records.next()
1616
# validate the header (note that we can only use the suffix in
1617
# current knit records).
1618
df, rec = self._parse_record_header(key, data)
1620
yield key, data, rec[3]
1622
def _record_to_data(self, key, digest, lines, dense_lines=None):
1623
"""Convert key, digest, lines into a raw data block.
1625
:param key: The key of the record. Currently keys are always serialised
1626
using just the trailing component.
1627
:param dense_lines: The bytes of lines but in a denser form. For
1628
instance, if lines is a list of 1000 bytestrings each ending in \n,
1629
dense_lines may be a list with one line in it, containing all the
1630
1000's lines and their \n's. Using dense_lines if it is already
1631
known is a win because the string join to create bytes in this
1632
function spends less time resizing the final string.
1633
:return: (len, a StringIO instance with the raw data ready to read.)
1635
# Note: using a string copy here increases memory pressure with e.g.
1636
# ISO's, but it is about 3 seconds faster on a 1.2Ghz intel machine
1637
# when doing the initial commit of a mozilla tree. RBC 20070921
1638
bytes = ''.join(chain(
1639
["version %s %d %s\n" % (key[-1],
1642
dense_lines or lines,
1643
["end %s\n" % key[-1]]))
1644
if type(bytes) != str:
1645
raise AssertionError(
1646
'data must be plain bytes was %s' % type(bytes))
1647
if lines and lines[-1][-1] != '\n':
1648
raise ValueError('corrupt lines value %r' % lines)
1649
compressed_bytes = tuned_gzip.bytes_to_gzip(bytes)
1650
return len(compressed_bytes), compressed_bytes
1652
def _split_header(self, line):
1655
raise KnitCorrupt(self,
1656
'unexpected number of elements in record header')
1660
"""See VersionedFiles.keys."""
1661
if 'evil' in debug.debug_flags:
1662
trace.mutter_callsite(2, "keys scales with size of history")
1663
sources = [self._index] + self._fallback_vfs
1665
for source in sources:
1666
result.update(source.keys())
1670
class _KndxIndex(object):
1671
"""Manages knit index files
1673
The index is kept in memory and read on startup, to enable
1674
fast lookups of revision information. The cursor of the index
1675
file is always pointing to the end, making it easy to append
1678
_cache is a cache for fast mapping from version id to a Index
1681
_history is a cache for fast mapping from indexes to version ids.
1683
The index data format is dictionary compressed when it comes to
1684
parent references; a index entry may only have parents that with a
1685
lover index number. As a result, the index is topological sorted.
1687
Duplicate entries may be written to the index for a single version id
1688
if this is done then the latter one completely replaces the former:
1689
this allows updates to correct version and parent information.
1690
Note that the two entries may share the delta, and that successive
1691
annotations and references MUST point to the first entry.
1693
The index file on disc contains a header, followed by one line per knit
1694
record. The same revision can be present in an index file more than once.
1695
The first occurrence gets assigned a sequence number starting from 0.
1697
The format of a single line is
1698
REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
1699
REVISION_ID is a utf8-encoded revision id
1700
FLAGS is a comma separated list of flags about the record. Values include
1701
no-eol, line-delta, fulltext.
1702
BYTE_OFFSET is the ascii representation of the byte offset in the data file
1703
that the the compressed data starts at.
1704
LENGTH is the ascii representation of the length of the data file.
1705
PARENT_ID a utf-8 revision id prefixed by a '.' that is a parent of
1707
PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
1708
revision id already in the knit that is a parent of REVISION_ID.
1709
The ' :' marker is the end of record marker.
1712
when a write is interrupted to the index file, it will result in a line
1713
that does not end in ' :'. If the ' :' is not present at the end of a line,
1714
or at the end of the file, then the record that is missing it will be
1715
ignored by the parser.
1717
When writing new records to the index file, the data is preceded by '\n'
1718
to ensure that records always start on new lines even if the last write was
1719
interrupted. As a result its normal for the last line in the index to be
1720
missing a trailing newline. One can be added with no harmful effects.
1722
:ivar _kndx_cache: dict from prefix to the old state of KnitIndex objects,
1723
where prefix is e.g. the (fileid,) for .texts instances or () for
1724
constant-mapped things like .revisions, and the old state is
1725
tuple(history_vector, cache_dict). This is used to prevent having an
1726
ABI change with the C extension that reads .kndx files.
1729
HEADER = "# bzr knit index 8\n"
1731
def __init__(self, transport, mapper, get_scope, allow_writes, is_locked):
1732
"""Create a _KndxIndex on transport using mapper."""
1733
self._transport = transport
1734
self._mapper = mapper
1735
self._get_scope = get_scope
1736
self._allow_writes = allow_writes
1737
self._is_locked = is_locked
1739
self.has_graph = True
1741
def add_records(self, records, random_id=False):
1742
"""Add multiple records to the index.
1744
:param records: a list of tuples:
1745
(key, options, access_memo, parents).
1746
:param random_id: If True the ids being added were randomly generated
1747
and no check for existence will be performed.
1750
for record in records:
1753
path = self._mapper.map(key) + '.kndx'
1754
path_keys = paths.setdefault(path, (prefix, []))
1755
path_keys[1].append(record)
1756
for path in sorted(paths):
1757
prefix, path_keys = paths[path]
1758
self._load_prefixes([prefix])
1760
orig_history = self._kndx_cache[prefix][1][:]
1761
orig_cache = self._kndx_cache[prefix][0].copy()
1764
for key, options, (_, pos, size), parents in path_keys:
1766
# kndx indices cannot be parentless.
1768
line = "\n%s %s %s %s %s :" % (
1769
key[-1], ','.join(options), pos, size,
1770
self._dictionary_compress(parents))
1771
if type(line) != str:
1772
raise AssertionError(
1773
'data must be utf8 was %s' % type(line))
1775
self._cache_key(key, options, pos, size, parents)
1776
if len(orig_history):
1777
self._transport.append_bytes(path, ''.join(lines))
1779
self._init_index(path, lines)
1781
# If any problems happen, restore the original values and re-raise
1782
self._kndx_cache[prefix] = (orig_cache, orig_history)
1785
def _cache_key(self, key, options, pos, size, parent_keys):
1786
"""Cache a version record in the history array and index cache.
1788
This is inlined into _load_data for performance. KEEP IN SYNC.
1789
(It saves 60ms, 25% of the __init__ overhead on local 4000 record
1793
version_id = key[-1]
1794
# last-element only for compatibilty with the C load_data.
1795
parents = tuple(parent[-1] for parent in parent_keys)
1796
for parent in parent_keys:
1797
if parent[:-1] != prefix:
1798
raise ValueError("mismatched prefixes for %r, %r" % (
1800
cache, history = self._kndx_cache[prefix]
1801
# only want the _history index to reference the 1st index entry
1803
if version_id not in cache:
1804
index = len(history)
1805
history.append(version_id)
1807
index = cache[version_id][5]
1808
cache[version_id] = (version_id,
1815
def check_header(self, fp):
1816
line = fp.readline()
1818
# An empty file can actually be treated as though the file doesn't
1820
raise errors.NoSuchFile(self)
1821
if line != self.HEADER:
1822
raise KnitHeaderError(badline=line, filename=self)
1824
def _check_read(self):
1825
if not self._is_locked():
1826
raise errors.ObjectNotLocked(self)
1827
if self._get_scope() != self._scope:
1830
def _check_write_ok(self):
1831
"""Assert if not writes are permitted."""
1832
if not self._is_locked():
1833
raise errors.ObjectNotLocked(self)
1834
if self._get_scope() != self._scope:
1836
if self._mode != 'w':
1837
raise errors.ReadOnlyObjectDirtiedError(self)
1839
def get_build_details(self, keys):
1840
"""Get the method, index_memo and compression parent for keys.
1842
Ghosts are omitted from the result.
1844
:param keys: An iterable of keys.
1845
:return: A dict of key:(index_memo, compression_parent, parents,
1848
opaque structure to pass to read_records to extract the raw
1851
Content that this record is built upon, may be None
1853
Logical parents of this node
1855
extra information about the content which needs to be passed to
1856
Factory.parse_record
1858
prefixes = self._partition_keys(keys)
1859
parent_map = self.get_parent_map(keys)
1862
if key not in parent_map:
1864
method = self.get_method(key)
1865
parents = parent_map[key]
1866
if method == 'fulltext':
1867
compression_parent = None
1869
compression_parent = parents[0]
1870
noeol = 'no-eol' in self.get_options(key)
1871
index_memo = self.get_position(key)
1872
result[key] = (index_memo, compression_parent,
1873
parents, (method, noeol))
1876
def get_method(self, key):
1877
"""Return compression method of specified key."""
1878
options = self.get_options(key)
1879
if 'fulltext' in options:
1881
elif 'line-delta' in options:
1884
raise errors.KnitIndexUnknownMethod(self, options)
1886
def get_options(self, key):
1887
"""Return a list representing options.
1891
prefix, suffix = self._split_key(key)
1892
self._load_prefixes([prefix])
1894
return self._kndx_cache[prefix][0][suffix][1]
1896
raise RevisionNotPresent(key, self)
1898
def get_parent_map(self, keys):
1899
"""Get a map of the parents of keys.
1901
:param keys: The keys to look up parents for.
1902
:return: A mapping from keys to parents. Absent keys are absent from
1905
# Parse what we need to up front, this potentially trades off I/O
1906
# locality (.kndx and .knit in the same block group for the same file
1907
# id) for less checking in inner loops.
1908
prefixes = set(key[:-1] for key in keys)
1909
self._load_prefixes(prefixes)
1914
suffix_parents = self._kndx_cache[prefix][0][key[-1]][4]
1918
result[key] = tuple(prefix + (suffix,) for
1919
suffix in suffix_parents)
1922
def get_position(self, key):
1923
"""Return details needed to access the version.
1925
:return: a tuple (key, data position, size) to hand to the access
1926
logic to get the record.
1928
prefix, suffix = self._split_key(key)
1929
self._load_prefixes([prefix])
1930
entry = self._kndx_cache[prefix][0][suffix]
1931
return key, entry[2], entry[3]
1933
def _init_index(self, path, extra_lines=[]):
1934
"""Initialize an index."""
1936
sio.write(self.HEADER)
1937
sio.writelines(extra_lines)
1939
self._transport.put_file_non_atomic(path, sio,
1940
create_parent_dir=True)
1941
# self._create_parent_dir)
1942
# mode=self._file_mode,
1943
# dir_mode=self._dir_mode)
1946
"""Get all the keys in the collection.
1948
The keys are not ordered.
1951
# Identify all key prefixes.
1952
# XXX: A bit hacky, needs polish.
1953
if type(self._mapper) == ConstantMapper:
1957
for quoted_relpath in self._transport.iter_files_recursive():
1958
path, ext = os.path.splitext(quoted_relpath)
1960
prefixes = [self._mapper.unmap(path) for path in relpaths]
1961
self._load_prefixes(prefixes)
1962
for prefix in prefixes:
1963
for suffix in self._kndx_cache[prefix][1]:
1964
result.add(prefix + (suffix,))
1967
def _load_prefixes(self, prefixes):
1968
"""Load the indices for prefixes."""
1970
for prefix in prefixes:
1971
if prefix not in self._kndx_cache:
1972
# the load_data interface writes to these variables.
1975
self._filename = prefix
1977
path = self._mapper.map(prefix) + '.kndx'
1978
fp = self._transport.get(path)
1980
# _load_data may raise NoSuchFile if the target knit is
1982
_load_data(self, fp)
1985
self._kndx_cache[prefix] = (self._cache, self._history)
1990
self._kndx_cache[prefix] = ({}, [])
1991
if type(self._mapper) == ConstantMapper:
1992
# preserve behaviour for revisions.kndx etc.
1993
self._init_index(path)
1998
def _partition_keys(self, keys):
1999
"""Turn keys into a dict of prefix:suffix_list."""
2002
prefix_keys = result.setdefault(key[:-1], [])
2003
prefix_keys.append(key[-1])
2006
def _dictionary_compress(self, keys):
2007
"""Dictionary compress keys.
2009
:param keys: The keys to generate references to.
2010
:return: A string representation of keys. keys which are present are
2011
dictionary compressed, and others are emitted as fulltext with a
2017
prefix = keys[0][:-1]
2018
cache = self._kndx_cache[prefix][0]
2020
if key[:-1] != prefix:
2021
# kndx indices cannot refer across partitioned storage.
2022
raise ValueError("mismatched prefixes for %r" % keys)
2023
if key[-1] in cache:
2024
# -- inlined lookup() --
2025
result_list.append(str(cache[key[-1]][5]))
2026
# -- end lookup () --
2028
result_list.append('.' + key[-1])
2029
return ' '.join(result_list)
2031
def _reset_cache(self):
2032
# Possibly this should be a LRU cache. A dictionary from key_prefix to
2033
# (cache_dict, history_vector) for parsed kndx files.
2034
self._kndx_cache = {}
2035
self._scope = self._get_scope()
2036
allow_writes = self._allow_writes()
2042
def _split_key(self, key):
2043
"""Split key into a prefix and suffix."""
2044
return key[:-1], key[-1]
2047
class _KnitGraphIndex(object):
2048
"""A KnitVersionedFiles index layered on GraphIndex."""
2050
def __init__(self, graph_index, is_locked, deltas=False, parents=True,
2052
"""Construct a KnitGraphIndex on a graph_index.
2054
:param graph_index: An implementation of bzrlib.index.GraphIndex.
2055
:param is_locked: A callback to check whether the object should answer
2057
:param deltas: Allow delta-compressed records.
2058
:param parents: If True, record knits parents, if not do not record
2060
:param add_callback: If not None, allow additions to the index and call
2061
this callback with a list of added GraphIndex nodes:
2062
[(node, value, node_refs), ...]
2063
:param is_locked: A callback, returns True if the index is locked and
2066
self._add_callback = add_callback
2067
self._graph_index = graph_index
2068
self._deltas = deltas
2069
self._parents = parents
2070
if deltas and not parents:
2071
# XXX: TODO: Delta tree and parent graph should be conceptually
2073
raise KnitCorrupt(self, "Cannot do delta compression without "
2075
self.has_graph = parents
2076
self._is_locked = is_locked
2079
return "%s(%r)" % (self.__class__.__name__, self._graph_index)
2081
def add_records(self, records, random_id=False):
2082
"""Add multiple records to the index.
2084
This function does not insert data into the Immutable GraphIndex
2085
backing the KnitGraphIndex, instead it prepares data for insertion by
2086
the caller and checks that it is safe to insert then calls
2087
self._add_callback with the prepared GraphIndex nodes.
2089
:param records: a list of tuples:
2090
(key, options, access_memo, parents).
2091
:param random_id: If True the ids being added were randomly generated
2092
and no check for existence will be performed.
2094
if not self._add_callback:
2095
raise errors.ReadOnlyError(self)
2096
# we hope there are no repositories with inconsistent parentage
2100
for (key, options, access_memo, parents) in records:
2102
parents = tuple(parents)
2103
index, pos, size = access_memo
2104
if 'no-eol' in options:
2108
value += "%d %d" % (pos, size)
2109
if not self._deltas:
2110
if 'line-delta' in options:
2111
raise KnitCorrupt(self, "attempt to add line-delta in non-delta knit")
2114
if 'line-delta' in options:
2115
node_refs = (parents, (parents[0],))
2117
node_refs = (parents, ())
2119
node_refs = (parents, )
2122
raise KnitCorrupt(self, "attempt to add node with parents "
2123
"in parentless index.")
2125
keys[key] = (value, node_refs)
2128
present_nodes = self._get_entries(keys)
2129
for (index, key, value, node_refs) in present_nodes:
2130
if (value[0] != keys[key][0][0] or
2131
node_refs != keys[key][1]):
2132
raise KnitCorrupt(self, "inconsistent details in add_records"
2133
": %s %s" % ((value, node_refs), keys[key]))
2137
for key, (value, node_refs) in keys.iteritems():
2138
result.append((key, value, node_refs))
2140
for key, (value, node_refs) in keys.iteritems():
2141
result.append((key, value))
2142
self._add_callback(result)
2144
def _check_read(self):
2145
"""raise if reads are not permitted."""
2146
if not self._is_locked():
2147
raise errors.ObjectNotLocked(self)
2149
def _check_write_ok(self):
2150
"""Assert if writes are not permitted."""
2151
if not self._is_locked():
2152
raise errors.ObjectNotLocked(self)
2154
def _compression_parent(self, an_entry):
2155
# return the key that an_entry is compressed against, or None
2156
# Grab the second parent list (as deltas implies parents currently)
2157
compression_parents = an_entry[3][1]
2158
if not compression_parents:
2160
if len(compression_parents) != 1:
2161
raise AssertionError(
2162
"Too many compression parents: %r" % compression_parents)
2163
return compression_parents[0]
2165
def get_build_details(self, keys):
2166
"""Get the method, index_memo and compression parent for version_ids.
2168
Ghosts are omitted from the result.
2170
:param keys: An iterable of keys.
2171
:return: A dict of key:
2172
(index_memo, compression_parent, parents, record_details).
2174
opaque structure to pass to read_records to extract the raw
2177
Content that this record is built upon, may be None
2179
Logical parents of this node
2181
extra information about the content which needs to be passed to
2182
Factory.parse_record
2186
entries = self._get_entries(keys, False)
2187
for entry in entries:
2189
if not self._parents:
2192
parents = entry[3][0]
2193
if not self._deltas:
2194
compression_parent_key = None
2196
compression_parent_key = self._compression_parent(entry)
2197
noeol = (entry[2][0] == 'N')
2198
if compression_parent_key:
2199
method = 'line-delta'
2202
result[key] = (self._node_to_position(entry),
2203
compression_parent_key, parents,
2207
def _get_entries(self, keys, check_present=False):
2208
"""Get the entries for keys.
2210
:param keys: An iterable of index key tuples.
2215
for node in self._graph_index.iter_entries(keys):
2217
found_keys.add(node[1])
2219
# adapt parentless index to the rest of the code.
2220
for node in self._graph_index.iter_entries(keys):
2221
yield node[0], node[1], node[2], ()
2222
found_keys.add(node[1])
2224
missing_keys = keys.difference(found_keys)
2226
raise RevisionNotPresent(missing_keys.pop(), self)
2228
def get_method(self, key):
2229
"""Return compression method of specified key."""
2230
return self._get_method(self._get_node(key))
2232
def _get_method(self, node):
2233
if not self._deltas:
2235
if self._compression_parent(node):
2240
def _get_node(self, key):
2242
return list(self._get_entries([key]))[0]
2244
raise RevisionNotPresent(key, self)
2246
def get_options(self, key):
2247
"""Return a list representing options.
2251
node = self._get_node(key)
2252
options = [self._get_method(node)]
2253
if node[2][0] == 'N':
2254
options.append('no-eol')
2257
def get_parent_map(self, keys):
2258
"""Get a map of the parents of keys.
2260
:param keys: The keys to look up parents for.
2261
:return: A mapping from keys to parents. Absent keys are absent from
2265
nodes = self._get_entries(keys)
2269
result[node[1]] = node[3][0]
2272
result[node[1]] = None
2275
def get_position(self, key):
2276
"""Return details needed to access the version.
2278
:return: a tuple (index, data position, size) to hand to the access
2279
logic to get the record.
2281
node = self._get_node(key)
2282
return self._node_to_position(node)
2285
"""Get all the keys in the collection.
2287
The keys are not ordered.
2290
return [node[1] for node in self._graph_index.iter_all_entries()]
2292
def _node_to_position(self, node):
2293
"""Convert an index value to position details."""
2294
bits = node[2][1:].split(' ')
2295
return node[0], int(bits[0]), int(bits[1])
2298
class _KnitKeyAccess(object):
2299
"""Access to records in .knit files."""
2301
def __init__(self, transport, mapper):
2302
"""Create a _KnitKeyAccess with transport and mapper.
2304
:param transport: The transport the access object is rooted at.
2305
:param mapper: The mapper used to map keys to .knit files.
2307
self._transport = transport
2308
self._mapper = mapper
2310
def add_raw_records(self, key_sizes, raw_data):
2311
"""Add raw knit bytes to a storage area.
2313
The data is spooled to the container writer in one bytes-record per
2316
:param sizes: An iterable of tuples containing the key and size of each
2318
:param raw_data: A bytestring containing the data.
2319
:return: A list of memos to retrieve the record later. Each memo is an
2320
opaque index memo. For _KnitKeyAccess the memo is (key, pos,
2321
length), where the key is the record key.
2323
if type(raw_data) != str:
2324
raise AssertionError(
2325
'data must be plain bytes was %s' % type(raw_data))
2328
# TODO: This can be tuned for writing to sftp and other servers where
2329
# append() is relatively expensive by grouping the writes to each key
2331
for key, size in key_sizes:
2332
path = self._mapper.map(key)
2334
base = self._transport.append_bytes(path + '.knit',
2335
raw_data[offset:offset+size])
2336
except errors.NoSuchFile:
2337
self._transport.mkdir(osutils.dirname(path))
2338
base = self._transport.append_bytes(path + '.knit',
2339
raw_data[offset:offset+size])
2343
result.append((key, base, size))
2346
def get_raw_records(self, memos_for_retrieval):
2347
"""Get the raw bytes for a records.
2349
:param memos_for_retrieval: An iterable containing the access memo for
2350
retrieving the bytes.
2351
:return: An iterator over the bytes of the records.
2353
# first pass, group into same-index request to minimise readv's issued.
2355
current_prefix = None
2356
for (key, offset, length) in memos_for_retrieval:
2357
if current_prefix == key[:-1]:
2358
current_list.append((offset, length))
2360
if current_prefix is not None:
2361
request_lists.append((current_prefix, current_list))
2362
current_prefix = key[:-1]
2363
current_list = [(offset, length)]
2364
# handle the last entry
2365
if current_prefix is not None:
2366
request_lists.append((current_prefix, current_list))
2367
for prefix, read_vector in request_lists:
2368
path = self._mapper.map(prefix) + '.knit'
2369
for pos, data in self._transport.readv(path, read_vector):
2373
class _DirectPackAccess(object):
2374
"""Access to data in one or more packs with less translation."""
2376
def __init__(self, index_to_packs):
2377
"""Create a _DirectPackAccess object.
2379
:param index_to_packs: A dict mapping index objects to the transport
2380
and file names for obtaining data.
2382
self._container_writer = None
2383
self._write_index = None
2384
self._indices = index_to_packs
2386
def add_raw_records(self, key_sizes, raw_data):
2387
"""Add raw knit bytes to a storage area.
2389
The data is spooled to the container writer in one bytes-record per
2392
:param sizes: An iterable of tuples containing the key and size of each
2394
:param raw_data: A bytestring containing the data.
2395
:return: A list of memos to retrieve the record later. Each memo is an
2396
opaque index memo. For _DirectPackAccess the memo is (index, pos,
2397
length), where the index field is the write_index object supplied
2398
to the PackAccess object.
2400
if type(raw_data) != str:
2401
raise AssertionError(
2402
'data must be plain bytes was %s' % type(raw_data))
2405
for key, size in key_sizes:
2406
p_offset, p_length = self._container_writer.add_bytes_record(
2407
raw_data[offset:offset+size], [])
2409
result.append((self._write_index, p_offset, p_length))
2412
def get_raw_records(self, memos_for_retrieval):
2413
"""Get the raw bytes for a records.
2415
:param memos_for_retrieval: An iterable containing the (index, pos,
2416
length) memo for retrieving the bytes. The Pack access method
2417
looks up the pack to use for a given record in its index_to_pack
2419
:return: An iterator over the bytes of the records.
2421
# first pass, group into same-index requests
2423
current_index = None
2424
for (index, offset, length) in memos_for_retrieval:
2425
if current_index == index:
2426
current_list.append((offset, length))
2428
if current_index is not None:
2429
request_lists.append((current_index, current_list))
2430
current_index = index
2431
current_list = [(offset, length)]
2432
# handle the last entry
2433
if current_index is not None:
2434
request_lists.append((current_index, current_list))
2435
for index, offsets in request_lists:
2436
transport, path = self._indices[index]
2437
reader = pack.make_readv_reader(transport, path, offsets)
2438
for names, read_func in reader.iter_records():
2439
yield read_func(None)
2441
def set_writer(self, writer, index, transport_packname):
2442
"""Set a writer to use for adding data."""
2443
if index is not None:
2444
self._indices[index] = transport_packname
2445
self._container_writer = writer
2446
self._write_index = index
2449
# Deprecated, use PatienceSequenceMatcher instead
2450
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
2453
def annotate_knit(knit, revision_id):
2454
"""Annotate a knit with no cached annotations.
2456
This implementation is for knits with no cached annotations.
2457
It will work for knits with cached annotations, but this is not
2460
annotator = _KnitAnnotator(knit)
2461
return iter(annotator.annotate(revision_id))
2464
class _KnitAnnotator(object):
2465
"""Build up the annotations for a text."""
2467
def __init__(self, knit):
2470
# Content objects, differs from fulltexts because of how final newlines
2471
# are treated by knits. the content objects here will always have a
2473
self._fulltext_contents = {}
2475
# Annotated lines of specific revisions
2476
self._annotated_lines = {}
2478
# Track the raw data for nodes that we could not process yet.
2479
# This maps the revision_id of the base to a list of children that will
2480
# annotated from it.
2481
self._pending_children = {}
2483
# Nodes which cannot be extracted
2484
self._ghosts = set()
2486
# Track how many children this node has, so we know if we need to keep
2488
self._annotate_children = {}
2489
self._compression_children = {}
2491
self._all_build_details = {}
2492
# The children => parent revision_id graph
2493
self._revision_id_graph = {}
2495
self._heads_provider = None
2497
self._nodes_to_keep_annotations = set()
2498
self._generations_until_keep = 100
2500
def set_generations_until_keep(self, value):
2501
"""Set the number of generations before caching a node.
2503
Setting this to -1 will cache every merge node, setting this higher
2504
will cache fewer nodes.
2506
self._generations_until_keep = value
2508
def _add_fulltext_content(self, revision_id, content_obj):
2509
self._fulltext_contents[revision_id] = content_obj
2510
# TODO: jam 20080305 It might be good to check the sha1digest here
2511
return content_obj.text()
2513
def _check_parents(self, child, nodes_to_annotate):
2514
"""Check if all parents have been processed.
2516
:param child: A tuple of (rev_id, parents, raw_content)
2517
:param nodes_to_annotate: If child is ready, add it to
2518
nodes_to_annotate, otherwise put it back in self._pending_children
2520
for parent_id in child[1]:
2521
if (parent_id not in self._annotated_lines):
2522
# This parent is present, but another parent is missing
2523
self._pending_children.setdefault(parent_id,
2527
# This one is ready to be processed
2528
nodes_to_annotate.append(child)
2530
def _add_annotation(self, revision_id, fulltext, parent_ids,
2531
left_matching_blocks=None):
2532
"""Add an annotation entry.
2534
All parents should already have been annotated.
2535
:return: A list of children that now have their parents satisfied.
2537
a = self._annotated_lines
2538
annotated_parent_lines = [a[p] for p in parent_ids]
2539
annotated_lines = list(annotate.reannotate(annotated_parent_lines,
2540
fulltext, revision_id, left_matching_blocks,
2541
heads_provider=self._get_heads_provider()))
2542
self._annotated_lines[revision_id] = annotated_lines
2543
for p in parent_ids:
2544
ann_children = self._annotate_children[p]
2545
ann_children.remove(revision_id)
2546
if (not ann_children
2547
and p not in self._nodes_to_keep_annotations):
2548
del self._annotated_lines[p]
2549
del self._all_build_details[p]
2550
if p in self._fulltext_contents:
2551
del self._fulltext_contents[p]
2552
# Now that we've added this one, see if there are any pending
2553
# deltas to be done, certainly this parent is finished
2554
nodes_to_annotate = []
2555
for child in self._pending_children.pop(revision_id, []):
2556
self._check_parents(child, nodes_to_annotate)
2557
return nodes_to_annotate
2559
def _get_build_graph(self, key):
2560
"""Get the graphs for building texts and annotations.
2562
The data you need for creating a full text may be different than the
2563
data you need to annotate that text. (At a minimum, you need both
2564
parents to create an annotation, but only need 1 parent to generate the
2567
:return: A list of (key, index_memo) records, suitable for
2568
passing to read_records_iter to start reading in the raw data fro/
2571
if key in self._annotated_lines:
2574
pending = set([key])
2579
# get all pending nodes
2581
this_iteration = pending
2582
build_details = self._knit._index.get_build_details(this_iteration)
2583
self._all_build_details.update(build_details)
2584
# new_nodes = self._knit._index._get_entries(this_iteration)
2586
for key, details in build_details.iteritems():
2587
(index_memo, compression_parent, parents,
2588
record_details) = details
2589
self._revision_id_graph[key] = parents
2590
records.append((key, index_memo))
2591
# Do we actually need to check _annotated_lines?
2592
pending.update(p for p in parents
2593
if p not in self._all_build_details)
2594
if compression_parent:
2595
self._compression_children.setdefault(compression_parent,
2598
for parent in parents:
2599
self._annotate_children.setdefault(parent,
2601
num_gens = generation - kept_generation
2602
if ((num_gens >= self._generations_until_keep)
2603
and len(parents) > 1):
2604
kept_generation = generation
2605
self._nodes_to_keep_annotations.add(key)
2607
missing_versions = this_iteration.difference(build_details.keys())
2608
self._ghosts.update(missing_versions)
2609
for missing_version in missing_versions:
2610
# add a key, no parents
2611
self._revision_id_graph[missing_version] = ()
2612
pending.discard(missing_version) # don't look for it
2613
if self._ghosts.intersection(self._compression_children):
2615
"We cannot have nodes which have a ghost compression parent:\n"
2617
"compression children: %r"
2618
% (self._ghosts, self._compression_children))
2619
# Cleanout anything that depends on a ghost so that we don't wait for
2620
# the ghost to show up
2621
for node in self._ghosts:
2622
if node in self._annotate_children:
2623
# We won't be building this node
2624
del self._annotate_children[node]
2625
# Generally we will want to read the records in reverse order, because
2626
# we find the parent nodes after the children
2630
def _annotate_records(self, records):
2631
"""Build the annotations for the listed records."""
2632
# We iterate in the order read, rather than a strict order requested
2633
# However, process what we can, and put off to the side things that
2634
# still need parents, cleaning them up when those parents are
2636
for (rev_id, record,
2637
digest) in self._knit._read_records_iter(records):
2638
if rev_id in self._annotated_lines:
2640
parent_ids = self._revision_id_graph[rev_id]
2641
parent_ids = [p for p in parent_ids if p not in self._ghosts]
2642
details = self._all_build_details[rev_id]
2643
(index_memo, compression_parent, parents,
2644
record_details) = details
2645
nodes_to_annotate = []
2646
# TODO: Remove the punning between compression parents, and
2647
# parent_ids, we should be able to do this without assuming
2649
if len(parent_ids) == 0:
2650
# There are no parents for this node, so just add it
2651
# TODO: This probably needs to be decoupled
2652
fulltext_content, delta = self._knit._factory.parse_record(
2653
rev_id, record, record_details, None)
2654
fulltext = self._add_fulltext_content(rev_id, fulltext_content)
2655
nodes_to_annotate.extend(self._add_annotation(rev_id, fulltext,
2656
parent_ids, left_matching_blocks=None))
2658
child = (rev_id, parent_ids, record)
2659
# Check if all the parents are present
2660
self._check_parents(child, nodes_to_annotate)
2661
while nodes_to_annotate:
2662
# Should we use a queue here instead of a stack?
2663
(rev_id, parent_ids, record) = nodes_to_annotate.pop()
2664
(index_memo, compression_parent, parents,
2665
record_details) = self._all_build_details[rev_id]
2667
if compression_parent is not None:
2668
comp_children = self._compression_children[compression_parent]
2669
if rev_id not in comp_children:
2670
raise AssertionError("%r not in compression children %r"
2671
% (rev_id, comp_children))
2672
# If there is only 1 child, it is safe to reuse this
2674
reuse_content = (len(comp_children) == 1
2675
and compression_parent not in
2676
self._nodes_to_keep_annotations)
2678
# Remove it from the cache since it will be changing
2679
parent_fulltext_content = self._fulltext_contents.pop(compression_parent)
2680
# Make sure to copy the fulltext since it might be
2682
parent_fulltext = list(parent_fulltext_content.text())
2684
parent_fulltext_content = self._fulltext_contents[compression_parent]
2685
parent_fulltext = parent_fulltext_content.text()
2686
comp_children.remove(rev_id)
2687
fulltext_content, delta = self._knit._factory.parse_record(
2688
rev_id, record, record_details,
2689
parent_fulltext_content,
2690
copy_base_content=(not reuse_content))
2691
fulltext = self._add_fulltext_content(rev_id,
2693
if compression_parent == parent_ids[0]:
2694
# the compression_parent is the left parent, so we can
2696
blocks = KnitContent.get_line_delta_blocks(delta,
2697
parent_fulltext, fulltext)
2699
fulltext_content = self._knit._factory.parse_fulltext(
2701
fulltext = self._add_fulltext_content(rev_id,
2703
nodes_to_annotate.extend(
2704
self._add_annotation(rev_id, fulltext, parent_ids,
2705
left_matching_blocks=blocks))
2707
def _get_heads_provider(self):
2708
"""Create a heads provider for resolving ancestry issues."""
2709
if self._heads_provider is not None:
2710
return self._heads_provider
2711
parent_provider = _mod_graph.DictParentsProvider(
2712
self._revision_id_graph)
2713
graph_obj = _mod_graph.Graph(parent_provider)
2714
head_cache = _mod_graph.FrozenHeadsCache(graph_obj)
2715
self._heads_provider = head_cache
2718
def annotate(self, key):
2719
"""Return the annotated fulltext at the given key.
2721
:param key: The key to annotate.
2723
if len(self._knit._fallback_vfs) > 0:
2724
# stacked knits can't use the fast path at present.
2725
return self._simple_annotate(key)
2726
records = self._get_build_graph(key)
2727
if key in self._ghosts:
2728
raise errors.RevisionNotPresent(key, self._knit)
2729
self._annotate_records(records)
2730
return self._annotated_lines[key]
2732
def _simple_annotate(self, key):
2733
"""Return annotated fulltext, rediffing from the full texts.
2735
This is slow but makes no assumptions about the repository
2736
being able to produce line deltas.
2738
# TODO: this code generates a parent maps of present ancestors; it
2739
# could be split out into a separate method, and probably should use
2740
# iter_ancestry instead. -- mbp and robertc 20080704
2741
graph = _mod_graph.Graph(self._knit)
2742
head_cache = _mod_graph.FrozenHeadsCache(graph)
2743
search = graph._make_breadth_first_searcher([key])
2747
present, ghosts = search.next_with_ghosts()
2748
except StopIteration:
2750
keys.update(present)
2751
parent_map = self._knit.get_parent_map(keys)
2753
reannotate = annotate.reannotate
2754
for record in self._knit.get_record_stream(keys, 'topological', True):
2756
fulltext = split_lines(record.get_bytes_as('fulltext'))
2757
parents = parent_map[key]
2758
if parents is not None:
2759
parent_lines = [parent_cache[parent] for parent in parent_map[key]]
2762
parent_cache[key] = list(
2763
reannotate(parent_lines, fulltext, key, None, head_cache))
2765
return parent_cache[key]
2767
raise errors.RevisionNotPresent(key, self._knit)
2771
from bzrlib._knit_load_data_c import _load_data_c as _load_data
2773
from bzrlib._knit_load_data_py import _load_data_py as _load_data