352
352
where, size = self._data.add_record(version_id, digest, store_lines)
353
353
self._index.add_version(version_id, options, where, size, parents)
355
def _add_raw_records(self, records, data):
356
"""Add all the records 'records' with data pre-joined in 'data'.
358
:param records: A list of tuples(version_id, options, parents, size).
359
:param data: The data for the records. When it is written, the records
360
are adjusted to have pos pointing into data by the sum of
361
the preceeding records sizes.
364
pos = self._data.add_raw_record(data)
366
for (version_id, options, parents, size) in records:
367
index_entries.append((version_id, options, pos, size, parents))
369
self._index.add_versions(index_entries)
355
371
def clear_cache(self):
356
372
"""Clear the data cache only."""
357
373
self._data.clear_cache()
1141
1157
def add_version(self, version_id, options, pos, size, parents):
1142
1158
"""Add a version record to the index."""
1143
self._cache_version(version_id, options, pos, size, parents)
1145
content = "\n%s %s %s %s %s :" % (version_id.encode('utf-8'),
1149
self._version_list_to_index(parents))
1150
assert isinstance(content, str), 'content must be utf-8 encoded'
1151
self._transport.append(self._filename, StringIO(content))
1159
self.add_versions(((version_id, options, pos, size, parents),))
1161
def add_versions(self, versions):
1162
"""Add multiple versions to the index.
1164
:param versions: a list of tuples:
1165
(version_id, options, pos, size, parents).
1168
for version_id, options, pos, size, parents in versions:
1169
line = "\n%s %s %s %s %s :" % (version_id.encode('utf-8'),
1173
self._version_list_to_index(parents))
1174
assert isinstance(line, str), \
1175
'content must be utf-8 encoded: %r' % (line,)
1177
self._transport.append(self._filename, StringIO(''.join(lines)))
1178
# cache after writing, so that a failed write leads to missing cache
1179
# entries not extra ones. XXX TODO: RBC 20060502 in the event of a
1180
# failure, reload the index or flush it or some such, to prevent
1181
# writing records that did complete twice.
1182
for version_id, options, pos, size, parents in versions:
1183
self._cache_version(version_id, options, pos, size, parents)
1153
1185
def has_version(self, version_id):
1154
1186
"""True if the version is in the index."""
1155
1187
return self._cache.has_key(version_id)
1235
1267
return length, sio
1237
1269
def add_raw_record(self, raw_data):
1238
"""Append a prepared record to the data file."""
1270
"""Append a prepared record to the data file.
1272
:return: the offset in the data file raw_data was written.
1239
1274
assert isinstance(raw_data, str), 'data must be plain bytes'
1240
start_pos = self._transport.append(self._filename, StringIO(raw_data))
1241
return start_pos, len(raw_data)
1275
return self._transport.append(self._filename, StringIO(raw_data))
1243
1277
def add_record(self, version_id, digest, lines):
1244
1278
"""Write new text record to disk. Returns the position in the
1443
1477
# data suck the join:
1445
1479
total = len(version_list)
1446
# we want the raw gzip for bulk copying, but the record validated
1447
# just enough to be sure its the right one.
1448
# TODO: consider writev or write combining to reduce
1449
# death of a thousand cuts feeling.
1450
1482
for (version_id, raw_data), \
1451
1483
(version_id2, options, parents) in \
1452
1484
izip(self.source._data.read_records_iter_raw(copy_queue_records),
1454
1486
assert version_id == version_id2, 'logic error, inconsistent results'
1455
1487
count = count + 1
1456
1488
pb.update("Joining knit", count, total)
1457
pos, size = self.target._data.add_raw_record(raw_data)
1458
self.target._index.add_version(version_id, options, pos, size, parents)
1489
raw_records.append((version_id, options, parents, len(raw_data)))
1490
raw_datum.append(raw_data)
1491
self.target._add_raw_records(raw_records, ''.join(raw_datum))
1460
1493
for version in mismatched_versions:
1461
1494
# FIXME RBC 20060309 is this needed?