~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/store/__init__.py

  • Committer: Aaron Bentley
  • Date: 2005-10-03 19:50:36 UTC
  • mfrom: (1399)
  • mto: (1185.25.1)
  • mto: This revision was merged to the branch mainline in revision 1419.
  • Revision ID: abentley@panoramicfeedback.com-20051003195036-28dbd56f0e852b08
Merged latest from Robert Collins

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
unique ID.
25
25
"""
26
26
 
27
 
import errno
28
 
import gzip
29
 
import os
30
 
import tempfile
31
 
import types
32
 
from stat import ST_SIZE
33
 
from StringIO import StringIO
 
27
from cStringIO import StringIO
34
28
 
35
 
from bzrlib.errors import BzrError, UnlistableStore
 
29
from bzrlib.errors import BzrError, UnlistableStore, TransportNotPossible
36
30
from bzrlib.trace import mutter
37
 
import bzrlib.ui
38
 
import bzrlib.osutils as osutils
39
 
 
 
31
import bzrlib.transport
 
32
from bzrlib.transport.local import LocalTransport
40
33
 
41
34
######################################################################
42
35
# stores
45
38
    pass
46
39
 
47
40
 
48
 
class ImmutableStore(object):
49
 
    """Store that holds files indexed by unique names.
50
 
 
 
41
class Store(object):
 
42
    """This class represents the abstract storage layout for saving information.
 
43
    
51
44
    Files can be added, but not modified once they are in.  Typically
52
45
    the hash is used as the name, or something else known to be unique,
53
46
    such as a UUID.
54
 
 
55
 
    >>> st = ImmutableScratchStore()
56
 
 
57
 
    >>> st.add(StringIO('hello'), 'aa')
58
 
    >>> 'aa' in st
59
 
    True
60
 
    >>> 'foo' in st
61
 
    False
62
 
 
63
 
    You are not allowed to add an id that is already present.
64
 
 
65
 
    Entries can be retrieved as files, which may then be read.
66
 
 
67
 
    >>> st.add(StringIO('goodbye'), '123123')
68
 
    >>> st['123123'].read()
69
 
    'goodbye'
70
 
 
71
 
    TODO: Atomic add by writing to a temporary file and renaming.
72
 
 
73
 
    In bzr 0.0.5 and earlier, files within the store were marked
74
 
    readonly on disk.  This is no longer done but existing stores need
75
 
    to be accomodated.
76
47
    """
77
48
 
78
 
    def __init__(self, basedir):
79
 
        self._basedir = basedir
80
 
 
81
 
    def _path(self, entry_id):
82
 
        if not isinstance(entry_id, basestring):
83
 
            raise TypeError(type(entry_id))
84
 
        if '\\' in entry_id or '/' in entry_id:
85
 
            raise ValueError("invalid store id %r" % entry_id)
86
 
        return os.path.join(self._basedir, entry_id)
87
 
 
88
 
    def __repr__(self):
89
 
        return "%s(%r)" % (self.__class__.__name__, self._basedir)
90
 
 
91
 
    def add(self, f, fileid, compressed=True):
92
 
        """Add contents of a file into the store.
93
 
 
94
 
        f -- An open file, or file-like object."""
95
 
        # FIXME: Only works on files that will fit in memory
96
 
        
97
 
        from bzrlib.atomicfile import AtomicFile
98
 
        
99
 
        mutter("add store entry %r" % (fileid))
100
 
        if isinstance(f, types.StringTypes):
101
 
            content = f
102
 
        else:
103
 
            content = f.read()
104
 
            
105
 
        p = self._path(fileid)
106
 
        if os.access(p, os.F_OK) or os.access(p + '.gz', os.F_OK):
107
 
            raise BzrError("store %r already contains id %r" % (self._basedir, fileid))
108
 
 
109
 
        fn = p
110
 
        if compressed:
111
 
            fn = fn + '.gz'
112
 
            
113
 
        af = AtomicFile(fn, 'wb')
114
 
        try:
115
 
            if compressed:
116
 
                gf = gzip.GzipFile(mode='wb', fileobj=af)
117
 
                gf.write(content)
118
 
                gf.close()
 
49
    def __len__(self):
 
50
        raise NotImplementedError('Children should define their length')
 
51
 
 
52
    def __getitem__(self, fileid):
 
53
        """Returns a file reading from a particular entry."""
 
54
        raise NotImplementedError
 
55
 
 
56
    def __contains__(self, fileid):
 
57
        """"""
 
58
        raise NotImplementedError
 
59
 
 
60
    def __iter__(self):
 
61
        raise NotImplementedError
 
62
 
 
63
    def add(self, f, fileid):
 
64
        """Add a file object f to the store accessible from the given fileid"""
 
65
        raise NotImplementedError('Children of Store must define their method of adding entries.')
 
66
 
 
67
    def add_multi(self, entries):
 
68
        """Add a series of file-like or string objects to the store with the given
 
69
        identities.
 
70
        
 
71
        :param entries: A list of tuples of file,id pairs [(file1, id1), (file2, id2), ...]
 
72
                        This could also be a generator yielding (file,id) pairs.
 
73
        """
 
74
        for f, fileid in entries:
 
75
            self.add(f, fileid)
 
76
 
 
77
    def has(self, fileids):
 
78
        """Return True/False for each entry in fileids.
 
79
 
 
80
        :param fileids: A List or generator yielding file ids.
 
81
        :return: A generator or list returning True/False for each entry.
 
82
        """
 
83
        for fileid in fileids:
 
84
            if fileid in self:
 
85
                yield True
119
86
            else:
120
 
                af.write(content)
121
 
            af.commit()
122
 
        finally:
123
 
            af.close()
124
 
 
125
 
 
126
 
    def copy_multi(self, other, ids, permit_failure=False):
 
87
                yield False
 
88
 
 
89
    def get(self, fileids, permit_failure=False, pb=None):
 
90
        """Return a set of files, one for each requested entry.
 
91
        
 
92
        :param permit_failure: If true, return None for entries which do not 
 
93
                               exist.
 
94
        :return: A list or generator of file-like objects, one for each id.
 
95
        """
 
96
        for fileid in fileids:
 
97
            try:
 
98
                yield self[fileid]
 
99
            except KeyError:
 
100
                if permit_failure:
 
101
                    yield None
 
102
                else:
 
103
                    raise
 
104
 
 
105
    def copy_multi(self, other, ids, pb=None, permit_failure=False):
127
106
        """Copy texts for ids from other into self.
128
107
 
129
 
        If an id is present in self, it is skipped.
 
108
        If an id is present in self, it is skipped.  A count of copied
 
109
        ids is returned, which may be less than len(ids).
130
110
 
131
 
        Returns (count_copied, failed), where failed is a collection of ids
132
 
        that could not be copied.
 
111
        :param other: Another Store object
 
112
        :param ids: A list of entry ids to be copied
 
113
        :param pb: A ProgressBar object, if none is given, the default will be created.
 
114
        :param permit_failure: Allow missing entries to be ignored
 
115
        :return: (n_copied, [failed]) The number of entries copied successfully,
 
116
            followed by a list of entries which could not be copied (because they
 
117
            were missing)
133
118
        """
134
 
        pb = bzrlib.ui.ui_factory.progress_bar()
135
 
        
 
119
        if pb is None:
 
120
            pb = bzrlib.ui.ui_factory.progress_bar()
 
121
 
 
122
        # XXX: Is there any reason why we couldn't make this accept a generator
 
123
        # and build a list as it finds things to copy?
 
124
        ids = list(ids) # Make sure we don't have a generator, since we iterate 2 times
136
125
        pb.update('preparing to copy')
137
 
        to_copy = [id for id in ids if id not in self]
138
 
        if isinstance(other, ImmutableStore):
139
 
            return self.copy_multi_immutable(other, to_copy, pb, 
140
 
                                             permit_failure=permit_failure)
141
 
        count = 0
 
126
        to_copy = []
 
127
        for file_id, has in zip(ids, self.has(ids)):
 
128
            if not has:
 
129
                to_copy.append(file_id)
 
130
        return self._do_copy(other, to_copy, pb, permit_failure=permit_failure)
 
131
 
 
132
    def _do_copy(self, other, to_copy, pb, permit_failure=False):
 
133
        """This is the standard copying mechanism, just get them one at
 
134
        a time from remote, and store them locally.
 
135
 
 
136
        :param other: Another Store object
 
137
        :param to_copy: A list of entry ids to copy
 
138
        :param pb: A ProgressBar object to display completion status.
 
139
        :param permit_failure: Allow missing entries to be ignored
 
140
        :return: (n_copied, [failed])
 
141
            The number of entries copied, and a list of failed entries.
 
142
        """
 
143
        # This should be updated to use add_multi() rather than
 
144
        # the current methods of buffering requests.
 
145
        # One question, is it faster to queue up 1-10 and then copy 1-10
 
146
        # then queue up 11-20, copy 11-20
 
147
        # or to queue up 1-10, copy 1, queue 11, copy 2, etc?
 
148
        # sort of pipeline versus batch.
 
149
 
 
150
        # We can't use self._transport.copy_to because we don't know
 
151
        # whether the local tree is in the same format as other
142
152
        failed = set()
143
 
        for id in to_copy:
144
 
            count += 1
145
 
            pb.update('copy', count, len(to_copy))
146
 
            if not permit_failure:
147
 
                self.add(other[id], id)
148
 
            else:
 
153
        def buffer_requests():
 
154
            count = 0
 
155
            buffered_requests = []
 
156
            for fileid in to_copy:
149
157
                try:
150
 
                    entry = other[id]
 
158
                    f = other[fileid]
151
159
                except KeyError:
152
 
                    failed.add(id)
153
 
                    continue
154
 
                self.add(entry, id)
155
 
                
156
 
        if not permit_failure:
 
160
                    if permit_failure:
 
161
                        failed.add(fileid)
 
162
                        continue
 
163
                    else:
 
164
                        raise
 
165
 
 
166
                buffered_requests.append((f, fileid))
 
167
                if len(buffered_requests) > self._max_buffered_requests:
 
168
                    yield buffered_requests.pop(0)
 
169
                    count += 1
 
170
                    pb.update('copy', count, len(to_copy))
 
171
 
 
172
            for req in buffered_requests:
 
173
                yield req
 
174
                count += 1
 
175
                pb.update('copy', count, len(to_copy))
 
176
 
157
177
            assert count == len(to_copy)
158
 
        pb.clear()
159
 
        return count, failed
160
 
 
161
 
    def copy_multi_immutable(self, other, to_copy, pb, permit_failure=False):
162
 
        count = 0
163
 
        failed = set()
164
 
        for id in to_copy:
165
 
            p = self._path(id)
166
 
            other_p = other._path(id)
167
 
            try:
168
 
                osutils.link_or_copy(other_p, p)
169
 
            except (IOError, OSError), e:
170
 
                if e.errno == errno.ENOENT:
171
 
                    if not permit_failure:
172
 
                        osutils.link_or_copy(other_p+".gz", p+".gz")
173
 
                    else:
174
 
                        try:
175
 
                            osutils.link_or_copy(other_p+".gz", p+".gz")
176
 
                        except IOError, e:
177
 
                            if e.errno == errno.ENOENT:
178
 
                                failed.add(id)
179
 
                            else:
180
 
                                raise
181
 
                else:
182
 
                    raise
183
 
            
184
 
            count += 1
185
 
            pb.update('copy', count, len(to_copy))
186
 
        assert count == len(to_copy)
187
 
        pb.clear()
188
 
        return count, failed
189
 
    
 
178
 
 
179
        self.add_multi(buffer_requests())
 
180
 
 
181
        pb.clear()
 
182
        return len(to_copy), failed
 
183
 
 
184
 
 
185
class TransportStore(Store):
 
186
    """A TransportStore is a Store superclass for Stores that use Transports."""
 
187
 
 
188
    _max_buffered_requests = 10
 
189
 
 
190
    def __init__(self, transport):
 
191
        assert isinstance(transport, bzrlib.transport.Transport)
 
192
        super(TransportStore, self).__init__()
 
193
        self._transport = transport
 
194
 
 
195
    def __repr__(self):
 
196
        if self._transport is None:
 
197
            return "%s(None)" % (self.__class__.__name__)
 
198
        else:
 
199
            return "%s(%r)" % (self.__class__.__name__, self._transport.base)
 
200
 
 
201
    __str__ = __repr__
 
202
 
 
203
 
 
204
class ImmutableMemoryStore(Store):
 
205
    """A memory only store."""
190
206
 
191
207
    def __contains__(self, fileid):
192
 
        """"""
193
 
        p = self._path(fileid)
194
 
        return (os.access(p, os.R_OK)
195
 
                or os.access(p + '.gz', os.R_OK))
196
 
 
197
 
    # TODO: Guard against the same thing being stored twice,
198
 
    # compressed and uncompressed
199
 
 
200
 
    def __iter__(self):
201
 
        for f in os.listdir(self._basedir):
202
 
            if f[-3:] == '.gz':
203
 
                # TODO: case-insensitive?
204
 
                yield f[:-3]
205
 
            else:
206
 
                yield f
207
 
 
208
 
    def __len__(self):
209
 
        return len(os.listdir(self._basedir))
210
 
 
 
208
        return self._contents.has_key(fileid)
 
209
 
 
210
    def __init__(self):
 
211
        super(ImmutableMemoryStore, self).__init__()
 
212
        self._contents = {}
 
213
 
 
214
    def add(self, stream, fileid, compressed=True):
 
215
        if self._contents.has_key(fileid):
 
216
            raise StoreError("fileid %s already in the store" % fileid)
 
217
        self._contents[fileid] = stream.read()
211
218
 
212
219
    def __getitem__(self, fileid):
213
220
        """Returns a file reading from a particular entry."""
214
 
        p = self._path(fileid)
215
 
        try:
216
 
            return gzip.GzipFile(p + '.gz', 'rb')
217
 
        except IOError, e:
218
 
            if e.errno != errno.ENOENT:
219
 
                raise
220
 
 
221
 
        try:
222
 
            return file(p, 'rb')
223
 
        except IOError, e:
224
 
            if e.errno != errno.ENOENT:
225
 
                raise
226
 
 
227
 
        raise KeyError(fileid)
228
 
 
 
221
        if not self._contents.has_key(fileid):
 
222
            raise IndexError
 
223
        return StringIO(self._contents[fileid])
 
224
 
 
225
    def _item_size(self, fileid):
 
226
        return len(self._contents[fileid])
 
227
 
 
228
    def __iter__(self):
 
229
        return iter(self._contents.keys())
229
230
 
230
231
    def total_size(self):
231
 
        """Return (count, bytes)
232
 
 
233
 
        This is the (compressed) size stored on disk, not the size of
234
 
        the content."""
235
 
        total = 0
 
232
        result = 0
236
233
        count = 0
237
 
        for fid in self:
 
234
        for fileid in self:
238
235
            count += 1
239
 
            p = self._path(fid)
240
 
            try:
241
 
                total += os.stat(p)[ST_SIZE]
242
 
            except OSError:
243
 
                total += os.stat(p + '.gz')[ST_SIZE]
244
 
                
245
 
        return count, total
246
 
 
247
 
 
248
 
 
249
 
 
250
 
class ImmutableScratchStore(ImmutableStore):
251
 
    """Self-destructing test subclass of ImmutableStore.
252
 
 
253
 
    The Store only exists for the lifetime of the Python object.
254
 
 Obviously you should not put anything precious in it.
 
236
            result += self._item_size(fileid)
 
237
        return count, result
 
238
        
 
239
 
 
240
class CachedStore(Store):
 
241
    """A store that caches data locally, to avoid repeated downloads.
 
242
    The precacache method should be used to avoid server round-trips for
 
243
    every piece of data.
255
244
    """
256
 
    def __init__(self):
257
 
        ImmutableStore.__init__(self, tempfile.mkdtemp())
258
 
 
259
 
    def __del__(self):
260
 
        for f in os.listdir(self._basedir):
261
 
            fpath = os.path.join(self._basedir, f)
262
 
            # needed on windows, and maybe some other filesystems
263
 
            os.chmod(fpath, 0600)
264
 
            os.remove(fpath)
265
 
        os.rmdir(self._basedir)
266
 
        mutter("%r destroyed" % self)
 
245
 
 
246
    def __init__(self, store, cache_dir):
 
247
        super(CachedStore, self).__init__()
 
248
        self.source_store = store
 
249
        # This clones the source store type with a locally bound
 
250
        # transport. FIXME: it assumes a constructor is == cloning.
 
251
        # clonable store - it might be nicer to actually have a clone()
 
252
        # or something. RBC 20051003
 
253
        self.cache_store = store.__class__(LocalTransport(cache_dir))
 
254
 
 
255
    def __getitem__(self, id):
 
256
        mutter("Cache add %s" % id)
 
257
        if id not in self.cache_store:
 
258
            self.cache_store.add(self.source_store[id], id)
 
259
        return self.cache_store[id]
 
260
 
 
261
    def __contains__(self, fileid):
 
262
        if fileid in self.cache_store:
 
263
            return True
 
264
        if fileid in self.source_store:
 
265
            # We could copy at this time
 
266
            return True
 
267
        return False
 
268
 
 
269
    def get(self, fileids, permit_failure=False, pb=None):
 
270
        fileids = list(fileids)
 
271
        hasids = self.cache_store.has(fileids)
 
272
        needs = set()
 
273
        for has, fileid in zip(hasids, fileids):
 
274
            if not has:
 
275
                needs.add(fileid)
 
276
        if needs:
 
277
            self.cache_store.copy_multi(self.source_store, needs,
 
278
                    permit_failure=permit_failure)
 
279
        return self.cache_store.get(fileids,
 
280
                permit_failure=permit_failure, pb=pb)
 
281
 
 
282
    def prefetch(self, ids):
 
283
        """Copy a series of ids into the cache, before they are used.
 
284
        For remote stores that support pipelining or async downloads, this can
 
285
        increase speed considerably.
 
286
 
 
287
        Failures while prefetching are ignored.
 
288
        """
 
289
        mutter("Prefetch of ids %s" % ",".join(ids))
 
290
        self.cache_store.copy_multi(self.source_store, ids, 
 
291
                                    permit_failure=True)
 
292
 
267
293
 
268
294
def copy_all(store_from, store_to):
269
295
    """Copy all ids from one store to another."""
 
296
    # TODO: Optional progress indicator
270
297
    if not hasattr(store_from, "__iter__"):
271
298
        raise UnlistableStore(store_from)
272
 
    ids = [f for f in store_from]
 
299
    try:
 
300
        ids = [f for f in store_from]
 
301
    except (NotImplementedError, TransportNotPossible):
 
302
        raise UnlistableStore(store_from)
273
303
    store_to.copy_multi(store_from, ids)
 
304