13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""Knit versionedfile implementation.
23
lifeless: the data file is made up of "delta records". each delta record has a delta header
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3) the digest of
25
the -expanded data- (ie, the delta applied to the parent). the delta also ends with a
23
lifeless: the data file is made up of "delta records". each delta record has a delta header
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3) the digest of
25
the -expanded data- (ie, the delta applied to the parent). the delta also ends with a
26
26
end-marker; simply "end VERSION"
28
28
delta can be line or full contents.a
55
# 10:16 < lifeless> make partial index writes safe
56
# 10:16 < lifeless> implement 'knit.check()' like weave.check()
57
# 10:17 < lifeless> record known ghosts so we can detect when they are filled in rather than the current 'reweave
59
# move sha1 out of the content so that join is faster at verifying parents
60
# record content length ?
63
55
from cStringIO import StringIO
64
56
from itertools import izip, chain
249
243
class KnitContentFactory(ContentFactory):
250
244
"""Content factory for streaming from knits.
252
246
:seealso ContentFactory:
255
249
def __init__(self, key, parents, build_details, sha1, raw_record,
256
250
annotated, knit=None, network_bytes=None):
257
251
"""Create a KnitContentFactory for key.
259
253
:param key: The key.
260
254
:param parents: The parents.
261
255
:param build_details: The build details as returned from
305
299
if self._network_bytes is None:
306
300
self._create_network_bytes()
307
301
return self._network_bytes
302
if ('-ft-' in self.storage_kind and
303
storage_kind in ('chunked', 'fulltext')):
304
adapter_key = (self.storage_kind, 'fulltext')
305
adapter_factory = adapter_registry.get(adapter_key)
306
adapter = adapter_factory(None)
307
bytes = adapter.get_bytes(self)
308
if storage_kind == 'chunked':
308
312
if self._knit is not None:
313
# Not redundant with direct conversion above - that only handles
309
315
if storage_kind == 'chunked':
310
316
return self._knit.get_lines(self.key[0])
311
317
elif storage_kind == 'fulltext':
323
329
def __init__(self, key, parents, generator, first):
324
330
"""Create a LazyKnitContentFactory.
326
332
:param key: The key of the record.
327
333
:param parents: The parents of the record.
328
334
:param generator: A _ContentMapGenerator containing the record for this
404
410
class KnitContent(object):
405
411
"""Content of a knit version to which deltas can be applied.
407
413
This is always stored in memory as a list of lines with \n at the end,
408
plus a flag saying if the final ending is really there or not, because that
414
plus a flag saying if the final ending is really there or not, because that
409
415
corresponds to the on-disk knit representation.
814
820
versioned_files.writer.end()
823
def _get_total_build_size(self, keys, positions):
824
"""Determine the total bytes to build these keys.
826
(helper function because _KnitGraphIndex and _KndxIndex work the same, but
827
don't inherit from a common base.)
829
:param keys: Keys that we want to build
830
:param positions: dict of {key, (info, index_memo, comp_parent)} (such
831
as returned by _get_components_positions)
832
:return: Number of bytes to build those keys
834
all_build_index_memos = {}
838
for key in build_keys:
839
# This is mostly for the 'stacked' case
840
# Where we will be getting the data from a fallback
841
if key not in positions:
843
_, index_memo, compression_parent = positions[key]
844
all_build_index_memos[key] = index_memo
845
if compression_parent not in all_build_index_memos:
846
next_keys.add(compression_parent)
847
build_keys = next_keys
848
return sum([index_memo[2] for index_memo
849
in all_build_index_memos.itervalues()])
817
852
class KnitVersionedFiles(VersionedFiles):
818
853
"""Storage for many versioned files using knit compression.
820
855
Backend storage is managed by indices and data objects.
822
:ivar _index: A _KnitGraphIndex or similar that can describe the
823
parents, graph, compression and data location of entries in this
824
KnitVersionedFiles. Note that this is only the index for
857
:ivar _index: A _KnitGraphIndex or similar that can describe the
858
parents, graph, compression and data location of entries in this
859
KnitVersionedFiles. Note that this is only the index for
825
860
*this* vfs; if there are fallbacks they must be queried separately.
920
955
lines[-1] = lines[-1] + '\n'
921
956
line_bytes += '\n'
958
for element in key[:-1]:
924
959
if type(element) != str:
925
960
raise TypeError("key contains non-strings: %r" % (key,))
962
key = key[:-1] + ('sha1:' + digest,)
963
elif type(key[-1]) != str:
964
raise TypeError("key contains non-strings: %r" % (key,))
926
965
# Knit hunks are still last-element only
927
966
version_id = key[-1]
928
967
content = self._factory.make(lines, version_id)
945
984
options.append('fulltext')
946
985
# isinstance is slower and we have no hierarchy.
947
if self._factory.__class__ == KnitPlainFactory:
986
if self._factory.__class__ is KnitPlainFactory:
948
987
# Use the already joined bytes saving iteration time in
949
988
# _record_to_data.
950
989
size, bytes = self._record_to_data(key, digest,
987
1026
def _check_add(self, key, lines, random_id, check_content):
988
1027
"""check that version_id and lines are safe to add."""
989
1028
version_id = key[-1]
990
if contains_whitespace(version_id):
991
raise InvalidRevisionId(version_id, self)
992
self.check_not_reserved_id(version_id)
1029
if version_id is not None:
1030
if contains_whitespace(version_id):
1031
raise InvalidRevisionId(version_id, self)
1032
self.check_not_reserved_id(version_id)
993
1033
# TODO: If random_id==False and the key is already present, we should
994
1034
# probably check that the existing content is identical to what is
995
1035
# being inserted, and otherwise raise an exception. This would make
1149
1189
build-parent of the version, i.e. the leftmost ancestor.
1150
1190
Will be None if the record is not a delta.
1151
1191
:param keys: The keys to build a map for
1152
:param allow_missing: If some records are missing, rather than
1192
:param allow_missing: If some records are missing, rather than
1153
1193
error, just return the data that could be generated.
1155
1195
raw_map = self._get_record_map_unparsed(keys,
1190
1230
records = [(key, i_m) for key, (r, i_m, n)
1191
1231
in position_map.iteritems()]
1232
# Sort by the index memo, so that we request records from the
1233
# same pack file together, and in forward-sorted order
1234
records.sort(key=operator.itemgetter(1))
1192
1235
raw_record_map = {}
1193
1236
for key, data in self._read_records_iter_unchecked(records):
1194
1237
(record_details, index_memo, next) = position_map[key]
1197
1240
except errors.RetryWithNewPacks, e:
1198
1241
self._access.reload_or_raise(e)
1200
def _split_by_prefix(self, keys):
1244
def _split_by_prefix(cls, keys):
1201
1245
"""For the given keys, split them up based on their prefix.
1203
1247
To keep memory pressure somewhat under control, split the
1206
1250
This should be revisited if _get_content_maps() can ever cross
1207
1251
file-id boundaries.
1253
The keys for a given file_id are kept in the same relative order.
1254
Ordering between file_ids is not, though prefix_order will return the
1255
order that the key was first seen.
1209
1257
:param keys: An iterable of key tuples
1210
:return: A dict of {prefix: [key_list]}
1258
:return: (split_map, prefix_order)
1259
split_map A dictionary mapping prefix => keys
1260
prefix_order The order that we saw the various prefixes
1212
1262
split_by_prefix = {}
1213
1264
for key in keys:
1214
1265
if len(key) == 1:
1215
split_by_prefix.setdefault('', []).append(key)
1217
split_by_prefix.setdefault(key[0], []).append(key)
1218
return split_by_prefix
1270
if prefix in split_by_prefix:
1271
split_by_prefix[prefix].append(key)
1273
split_by_prefix[prefix] = [key]
1274
prefix_order.append(prefix)
1275
return split_by_prefix, prefix_order
1277
def _group_keys_for_io(self, keys, non_local_keys, positions,
1278
_min_buffer_size=_STREAM_MIN_BUFFER_SIZE):
1279
"""For the given keys, group them into 'best-sized' requests.
1281
The idea is to avoid making 1 request per file, but to never try to
1282
unpack an entire 1.5GB source tree in a single pass. Also when
1283
possible, we should try to group requests to the same pack file
1286
:return: list of (keys, non_local) tuples that indicate what keys
1287
should be fetched next.
1289
# TODO: Ideally we would group on 2 factors. We want to extract texts
1290
# from the same pack file together, and we want to extract all
1291
# the texts for a given build-chain together. Ultimately it
1292
# probably needs a better global view.
1293
total_keys = len(keys)
1294
prefix_split_keys, prefix_order = self._split_by_prefix(keys)
1295
prefix_split_non_local_keys, _ = self._split_by_prefix(non_local_keys)
1297
cur_non_local = set()
1301
for prefix in prefix_order:
1302
keys = prefix_split_keys[prefix]
1303
non_local = prefix_split_non_local_keys.get(prefix, [])
1305
this_size = self._index._get_total_build_size(keys, positions)
1306
cur_size += this_size
1307
cur_keys.extend(keys)
1308
cur_non_local.update(non_local)
1309
if cur_size > _min_buffer_size:
1310
result.append((cur_keys, cur_non_local))
1311
sizes.append(cur_size)
1313
cur_non_local = set()
1316
result.append((cur_keys, cur_non_local))
1317
sizes.append(cur_size)
1220
1320
def get_record_stream(self, keys, ordering, include_delta_closure):
1221
1321
"""Get a stream of records for keys.
1296
1396
needed_from_fallback.add(key)
1297
1397
# Double index lookups here : need a unified api ?
1298
1398
global_map, parent_maps = self._get_parent_map_with_sources(keys)
1299
if ordering == 'topological':
1300
# Global topological sort
1301
present_keys = tsort.topo_sort(global_map)
1399
if ordering in ('topological', 'groupcompress'):
1400
if ordering == 'topological':
1401
# Global topological sort
1402
present_keys = tsort.topo_sort(global_map)
1404
present_keys = sort_groupcompress(global_map)
1302
1405
# Now group by source:
1303
1406
source_keys = []
1304
1407
current_source = None
1342
1445
# XXX: get_content_maps performs its own index queries; allow state
1343
1446
# to be passed in.
1344
1447
non_local_keys = needed_from_fallback - absent_keys
1345
prefix_split_keys = self._split_by_prefix(present_keys)
1346
prefix_split_non_local_keys = self._split_by_prefix(non_local_keys)
1347
for prefix, keys in prefix_split_keys.iteritems():
1348
non_local = prefix_split_non_local_keys.get(prefix, [])
1349
non_local = set(non_local)
1350
generator = _VFContentMapGenerator(self, keys, non_local,
1448
for keys, non_local_keys in self._group_keys_for_io(present_keys,
1451
generator = _VFContentMapGenerator(self, keys, non_local_keys,
1352
1453
for record in generator.get_record_stream():
1485
1587
access_memo = self._access.add_raw_records(
1486
1588
[(record.key, len(bytes))], bytes)[0]
1487
1589
index_entry = (record.key, options, access_memo, parents)
1489
1590
if 'fulltext' not in options:
1490
1591
# Not a fulltext, so we need to make sure the compression
1491
1592
# parent will also be present.
1526
1628
except errors.RevisionAlreadyPresent:
1528
1630
# Add any records whose basis parent is now available.
1529
added_keys = [record.key]
1531
key = added_keys.pop(0)
1532
if key in buffered_index_entries:
1533
index_entries = buffered_index_entries[key]
1534
self._index.add_records(index_entries)
1536
[index_entry[0] for index_entry in index_entries])
1537
del buffered_index_entries[key]
1632
added_keys = [record.key]
1634
key = added_keys.pop(0)
1635
if key in buffered_index_entries:
1636
index_entries = buffered_index_entries[key]
1637
self._index.add_records(index_entries)
1639
[index_entry[0] for index_entry in index_entries])
1640
del buffered_index_entries[key]
1538
1641
if buffered_index_entries:
1539
1642
# There were index entries buffered at the end of the stream,
1540
1643
# So these need to be added (if the index supports holding such
1596
1700
key_records.append((key, details[0]))
1597
1701
records_iter = enumerate(self._read_records_iter(key_records))
1598
1702
for (key_idx, (key, data, sha_value)) in records_iter:
1599
pb.update('Walking content.', key_idx, total)
1703
pb.update('Walking content', key_idx, total)
1600
1704
compression_parent = build_details[key][1]
1601
1705
if compression_parent is None:
1603
1707
line_iterator = self._factory.get_fulltext_content(data)
1606
1710
line_iterator = self._factory.get_linedelta_content(data)
1607
1711
# Now that we are yielding the data for this key, remove it
1608
1712
# from the list
1619
1723
# If there are still keys we've not yet found, we look in the fallback
1620
1724
# vfs, and hope to find them there. Note that if the keys are found
1621
1725
# but had no changes or no content, the fallback may not return
1623
1727
if keys and not self._fallback_vfs:
1624
1728
# XXX: strictly the second parameter is meant to be the file id
1625
1729
# but it's not easily accessible here.
1632
1736
source_keys.add(key)
1633
1737
yield line, key
1634
1738
keys.difference_update(source_keys)
1635
pb.update('Walking content.', total, total)
1739
pb.update('Walking content', total, total)
1637
1741
def _make_line_delta(self, delta_seq, new_content):
1638
1742
"""Generate a line delta from delta_seq and new_content."""
1647
1751
delta=None, annotated=None,
1648
1752
left_matching_blocks=None):
1649
1753
"""Merge annotations for content and generate deltas.
1651
1755
This is done by comparing the annotations based on changes to the text
1652
1756
and generating a delta on the resulting full texts. If annotations are
1653
1757
not being created then a simple delta is created.
1806
1910
def _record_to_data(self, key, digest, lines, dense_lines=None):
1807
1911
"""Convert key, digest, lines into a raw data block.
1809
1913
:param key: The key of the record. Currently keys are always serialised
1810
1914
using just the trailing component.
1811
1915
:param dense_lines: The bytes of lines but in a denser form. For
2033
2137
def __init__(self, versioned_files, keys, nonlocal_keys=None,
2034
2138
global_map=None, raw_record_map=None):
2035
2139
"""Create a _ContentMapGenerator.
2037
2141
:param versioned_files: The versioned files that the texts are being
2038
2142
extracted from.
2039
2143
:param keys: The keys to produce content maps for.
2181
2285
Duplicate entries may be written to the index for a single version id
2182
2286
if this is done then the latter one completely replaces the former:
2183
this allows updates to correct version and parent information.
2287
this allows updates to correct version and parent information.
2184
2288
Note that the two entries may share the delta, and that successive
2185
2289
annotations and references MUST point to the first entry.
2187
2291
The index file on disc contains a header, followed by one line per knit
2188
2292
record. The same revision can be present in an index file more than once.
2189
The first occurrence gets assigned a sequence number starting from 0.
2293
The first occurrence gets assigned a sequence number starting from 0.
2191
2295
The format of a single line is
2192
2296
REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
2193
2297
REVISION_ID is a utf8-encoded revision id
2194
FLAGS is a comma separated list of flags about the record. Values include
2298
FLAGS is a comma separated list of flags about the record. Values include
2195
2299
no-eol, line-delta, fulltext.
2196
2300
BYTE_OFFSET is the ascii representation of the byte offset in the data file
2197
2301
that the the compressed data starts at.
2235
2339
def add_records(self, records, random_id=False, missing_compression_parents=False):
2236
2340
"""Add multiple records to the index.
2238
2342
:param records: a list of tuples:
2239
2343
(key, options, access_memo, parents).
2240
2344
:param random_id: If True the ids being added were randomly generated
2296
2400
# Because kndx files do not support atomic insertion via separate index
2297
2401
# files, they do not support this method.
2298
2402
raise NotImplementedError(self.get_missing_compression_parents)
2300
2404
def _cache_key(self, key, options, pos, size, parent_keys):
2301
2405
"""Cache a version record in the history array and index cache.
2593
2699
:param is_locked: A callback to check whether the object should answer
2595
2701
:param deltas: Allow delta-compressed records.
2596
:param parents: If True, record knits parents, if not do not record
2702
:param parents: If True, record knits parents, if not do not record
2598
2704
:param add_callback: If not None, allow additions to the index and call
2599
2705
this callback with a list of added GraphIndex nodes:
2620
2726
def add_records(self, records, random_id=False,
2621
2727
missing_compression_parents=False):
2622
2728
"""Add multiple records to the index.
2624
2730
This function does not insert data into the Immutable GraphIndex
2625
2731
backing the KnitGraphIndex, instead it prepares data for insertion by
2626
2732
the caller and checks that it is safe to insert then calls
2977
3092
class _DirectPackAccess(object):
2978
3093
"""Access to data in one or more packs with less translation."""
2980
def __init__(self, index_to_packs, reload_func=None):
3095
def __init__(self, index_to_packs, reload_func=None, flush_func=None):
2981
3096
"""Create a _DirectPackAccess object.
2983
3098
:param index_to_packs: A dict mapping index objects to the transport
3017
3133
result.append((self._write_index, p_offset, p_length))
3137
"""Flush pending writes on this access object.
3139
This will flush any buffered writes to a NewPack.
3141
if self._flush_func is not None:
3020
3144
def get_raw_records(self, memos_for_retrieval):
3021
3145
"""Get the raw bytes for a records.
3023
:param memos_for_retrieval: An iterable containing the (index, pos,
3147
:param memos_for_retrieval: An iterable containing the (index, pos,
3024
3148
length) memo for retrieving the bytes. The Pack access method
3025
3149
looks up the pack to use for a given record in its index_to_pack