195
195
for key in sorted(key_to_text):
196
196
compressor.compress(key, key_to_text[key], None)
197
entries = compressor._block._entries
198
raw_bytes = compressor.flush()
199
return entries, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)
197
block = compressor.flush()
198
entries = block._entries
199
# Go through from_bytes(to_bytes()) so that we start with a compressed
201
return entries, groupcompress.GroupCompressBlock.from_bytes(
201
204
def test_from_empty_bytes(self):
202
205
self.assertRaises(ValueError,
596
599
for key in sorted(key_to_text):
597
600
compressor.compress(key, key_to_text[key], None)
598
entries = compressor._block._entries
599
raw_bytes = compressor.flush()
601
block = compressor.flush()
602
entries = block._entries
603
raw_bytes = block.to_bytes()
600
604
return entries, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)
602
606
def add_key_to_manager(self, key, entries, block, manager):
692
696
text = self._texts[record.key]
693
697
self.assertEqual(text, record.get_bytes_as('fulltext'))
694
698
self.assertEqual([('key1',), ('key4',)], result_order)
700
def test__check_rebuild_no_changes(self):
701
entries, block = self.make_block(self._texts)
702
manager = groupcompress._LazyGroupContentManager(block)
703
# Request all the keys, which ensures that we won't rebuild
704
self.add_key_to_manager(('key1',), entries, block, manager)
705
self.add_key_to_manager(('key2',), entries, block, manager)
706
self.add_key_to_manager(('key3',), entries, block, manager)
707
self.add_key_to_manager(('key4',), entries, block, manager)
708
manager._check_rebuild_block()
709
self.assertIs(block, manager._block)
711
def test__check_rebuild_only_one(self):
712
entries, block = self.make_block(self._texts)
713
manager = groupcompress._LazyGroupContentManager(block)
714
# Request just the first key, which should trigger a 'strip' action
715
self.add_key_to_manager(('key1',), entries, block, manager)
716
manager._check_rebuild_block()
717
self.assertIsNot(block, manager._block)
718
self.assertTrue(block._content_length > manager._block._content_length)
719
# We should be able to still get the content out of this block, though
720
# it should only have 1 entry
721
for record in manager.get_record_stream():
722
self.assertEqual(('key1',), record.key)
723
self.assertEqual(self._texts[record.key],
724
record.get_bytes_as('fulltext'))
726
def test__check_rebuild_middle(self):
727
entries, block = self.make_block(self._texts)
728
manager = groupcompress._LazyGroupContentManager(block)
729
# Request a small key in the middle should trigger a 'rebuild'
730
self.add_key_to_manager(('key4',), entries, block, manager)
731
manager._check_rebuild_block()
732
self.assertIsNot(block, manager._block)
733
self.assertTrue(block._content_length > manager._block._content_length)
734
for record in manager.get_record_stream():
735
self.assertEqual(('key4',), record.key)
736
self.assertEqual(self._texts[record.key],
737
record.get_bytes_as('fulltext'))