~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/store.py

  • Committer: Martin Pool
  • Date: 2005-09-05 05:35:25 UTC
  • mfrom: (974.1.55)
  • Revision ID: mbp@sourcefrog.net-20050905053525-2112bac069dbe331
- merge various bug fixes from aaron

aaron.bentley@utoronto.ca-20050905020131-a2d5b7711dd6cd98

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
# TODO: Could remember a bias towards whether a particular store is typically
18
 
# compressed or not.
19
 
 
20
17
"""
21
18
Stores are the main data-storage mechanism for Bazaar-NG.
22
19
 
24
21
unique ID.
25
22
"""
26
23
 
27
 
from cStringIO import StringIO
28
 
from stat import ST_MODE, S_ISDIR, ST_SIZE
29
 
from zlib import adler32
30
 
 
31
 
import bzrlib.errors as errors
32
 
from bzrlib.errors import BzrError, UnlistableStore, TransportNotPossible
 
24
import os, tempfile, types, osutils, gzip, errno
 
25
from stat import ST_SIZE
 
26
from StringIO import StringIO
 
27
from bzrlib.errors import BzrError
33
28
from bzrlib.trace import mutter
34
 
import bzrlib.transport
35
 
from bzrlib.transport.local import LocalTransport
 
29
import bzrlib.ui
36
30
 
37
31
######################################################################
38
32
# stores
41
35
    pass
42
36
 
43
37
 
44
 
class Store(object):
45
 
    """This class represents the abstract storage layout for saving information.
46
 
    
 
38
class ImmutableStore(object):
 
39
    """Store that holds files indexed by unique names.
 
40
 
47
41
    Files can be added, but not modified once they are in.  Typically
48
42
    the hash is used as the name, or something else known to be unique,
49
43
    such as a UUID.
 
44
 
 
45
    >>> st = ImmutableScratchStore()
 
46
 
 
47
    >>> st.add(StringIO('hello'), 'aa')
 
48
    >>> 'aa' in st
 
49
    True
 
50
    >>> 'foo' in st
 
51
    False
 
52
 
 
53
    You are not allowed to add an id that is already present.
 
54
 
 
55
    Entries can be retrieved as files, which may then be read.
 
56
 
 
57
    >>> st.add(StringIO('goodbye'), '123123')
 
58
    >>> st['123123'].read()
 
59
    'goodbye'
 
60
 
 
61
    TODO: Atomic add by writing to a temporary file and renaming.
 
62
 
 
63
    In bzr 0.0.5 and earlier, files within the store were marked
 
64
    readonly on disk.  This is no longer done but existing stores need
 
65
    to be accomodated.
50
66
    """
51
67
 
52
 
    def __len__(self):
53
 
        raise NotImplementedError('Children should define their length')
54
 
 
55
 
    def get(self, file_id):
56
 
        """Returns a file reading from a particular entry."""
57
 
 
58
 
    def __getitem__(self, fileid):
59
 
        """DEPRECATED. Please use .get(file_id) instead."""
60
 
        raise NotImplementedError
61
 
 
62
 
    def __contains__(self, fileid):
63
 
        """"""
64
 
        raise NotImplementedError
65
 
 
66
 
    def __iter__(self):
67
 
        raise NotImplementedError
68
 
 
69
 
    def add(self, f, fileid):
70
 
        """Add a file object f to the store accessible from the given fileid"""
71
 
        raise NotImplementedError('Children of Store must define their method of adding entries.')
72
 
 
73
 
    def add_multi(self, entries):
74
 
        """Add a series of file-like or string objects to the store with the given
75
 
        identities.
76
 
        
77
 
        :param entries: A list of tuples of file,id pairs [(file1, id1), (file2, id2), ...]
78
 
                        This could also be a generator yielding (file,id) pairs.
79
 
        """
80
 
        for f, fileid in entries:
81
 
            self.add(f, fileid)
82
 
 
83
 
    def has(self, fileids):
84
 
        """Return True/False for each entry in fileids.
85
 
 
86
 
        :param fileids: A List or generator yielding file ids.
87
 
        :return: A generator or list returning True/False for each entry.
88
 
        """
89
 
        for fileid in fileids:
90
 
            if fileid in self:
91
 
                yield True
 
68
    def __init__(self, basedir):
 
69
        self._basedir = basedir
 
70
 
 
71
    def _path(self, id):
 
72
        if '\\' in id or '/' in id:
 
73
            raise ValueError("invalid store id %r" % id)
 
74
        return os.path.join(self._basedir, id)
 
75
 
 
76
    def __repr__(self):
 
77
        return "%s(%r)" % (self.__class__.__name__, self._basedir)
 
78
 
 
79
    def add(self, f, fileid, compressed=True):
 
80
        """Add contents of a file into the store.
 
81
 
 
82
        f -- An open file, or file-like object."""
 
83
        # FIXME: Only works on files that will fit in memory
 
84
        
 
85
        from bzrlib.atomicfile import AtomicFile
 
86
        
 
87
        mutter("add store entry %r" % (fileid))
 
88
        if isinstance(f, types.StringTypes):
 
89
            content = f
 
90
        else:
 
91
            content = f.read()
 
92
            
 
93
        p = self._path(fileid)
 
94
        if os.access(p, os.F_OK) or os.access(p + '.gz', os.F_OK):
 
95
            raise BzrError("store %r already contains id %r" % (self._basedir, fileid))
 
96
 
 
97
        fn = p
 
98
        if compressed:
 
99
            fn = fn + '.gz'
 
100
            
 
101
        af = AtomicFile(fn, 'wb')
 
102
        try:
 
103
            if compressed:
 
104
                gf = gzip.GzipFile(mode='wb', fileobj=af)
 
105
                gf.write(content)
 
106
                gf.close()
92
107
            else:
93
 
                yield False
94
 
 
95
 
    def listable(self):
96
 
        """Return True if this store is able to be listed."""
97
 
        return hasattr(self, "__iter__")
98
 
 
99
 
    def copy_multi(self, other, ids, pb=None, permit_failure=False):
 
108
                af.write(content)
 
109
            af.commit()
 
110
        finally:
 
111
            af.close()
 
112
 
 
113
 
 
114
    def copy_multi(self, other, ids, permit_failure=False):
100
115
        """Copy texts for ids from other into self.
101
116
 
102
 
        If an id is present in self, it is skipped.  A count of copied
103
 
        ids is returned, which may be less than len(ids).
 
117
        If an id is present in self, it is skipped.
104
118
 
105
 
        :param other: Another Store object
106
 
        :param ids: A list of entry ids to be copied
107
 
        :param pb: A ProgressBar object, if none is given, the default will be created.
108
 
        :param permit_failure: Allow missing entries to be ignored
109
 
        :return: (n_copied, [failed]) The number of entries copied successfully,
110
 
            followed by a list of entries which could not be copied (because they
111
 
            were missing)
 
119
        Returns (count_copied, failed), where failed is a collection of ids
 
120
        that could not be copied.
112
121
        """
113
 
        if pb is None:
114
 
            pb = bzrlib.ui.ui_factory.progress_bar()
115
 
 
116
 
        # XXX: Is there any reason why we couldn't make this accept a generator
117
 
        # and build a list as it finds things to copy?
118
 
        ids = list(ids) # Make sure we don't have a generator, since we iterate 2 times
 
122
        pb = bzrlib.ui.ui_factory.progress_bar()
 
123
        
119
124
        pb.update('preparing to copy')
120
 
        to_copy = []
121
 
        for file_id, has in zip(ids, self.has(ids)):
122
 
            if not has:
123
 
                to_copy.append(file_id)
124
 
        return self._do_copy(other, to_copy, pb, permit_failure=permit_failure)
125
 
 
126
 
    def _do_copy(self, other, to_copy, pb, permit_failure=False):
127
 
        """This is the standard copying mechanism, just get them one at
128
 
        a time from remote, and store them locally.
129
 
 
130
 
        :param other: Another Store object
131
 
        :param to_copy: A list of entry ids to copy
132
 
        :param pb: A ProgressBar object to display completion status.
133
 
        :param permit_failure: Allow missing entries to be ignored
134
 
        :return: (n_copied, [failed])
135
 
            The number of entries copied, and a list of failed entries.
136
 
        """
137
 
        # This should be updated to use add_multi() rather than
138
 
        # the current methods of buffering requests.
139
 
        # One question, is it faster to queue up 1-10 and then copy 1-10
140
 
        # then queue up 11-20, copy 11-20
141
 
        # or to queue up 1-10, copy 1, queue 11, copy 2, etc?
142
 
        # sort of pipeline versus batch.
143
 
 
144
 
        # We can't use self._transport.copy_to because we don't know
145
 
        # whether the local tree is in the same format as other
 
125
        to_copy = [id for id in ids if id not in self]
 
126
        if isinstance(other, ImmutableStore):
 
127
            return self.copy_multi_immutable(other, to_copy, pb)
 
128
        count = 0
146
129
        failed = set()
147
 
        def buffer_requests():
148
 
            count = 0
149
 
            buffered_requests = []
150
 
            for fileid in to_copy:
 
130
        for id in to_copy:
 
131
            count += 1
 
132
            pb.update('copy', count, len(to_copy))
 
133
            if not permit_failure:
 
134
                self.add(other[id], id)
 
135
            else:
151
136
                try:
152
 
                    f = other.get(fileid)
153
 
                except KeyError:
154
 
                    if permit_failure:
155
 
                        failed.add(fileid)
156
 
                        continue
 
137
                    entry = other[id]
 
138
                except IndexError:
 
139
                    failed.add(id)
 
140
                    continue
 
141
                self.add(entry, id)
 
142
                
 
143
        if not permit_failure:
 
144
            assert count == len(to_copy)
 
145
        pb.clear()
 
146
        return count, failed
 
147
 
 
148
    def copy_multi_immutable(self, other, to_copy, pb, permit_failure=False):
 
149
        from shutil import copyfile
 
150
        count = 0
 
151
        failed = set()
 
152
        for id in to_copy:
 
153
            p = self._path(id)
 
154
            other_p = other._path(id)
 
155
            try:
 
156
                copyfile(other_p, p)
 
157
            except IOError, e:
 
158
                if e.errno == errno.ENOENT:
 
159
                    if not permit_failure:
 
160
                        copyfile(other_p+".gz", p+".gz")
157
161
                    else:
158
 
                        raise
159
 
 
160
 
                buffered_requests.append((f, fileid))
161
 
                if len(buffered_requests) > self._max_buffered_requests:
162
 
                    yield buffered_requests.pop(0)
163
 
                    count += 1
164
 
                    pb.update('copy', count, len(to_copy))
165
 
 
166
 
            for req in buffered_requests:
167
 
                yield req
168
 
                count += 1
169
 
                pb.update('copy', count, len(to_copy))
170
 
 
171
 
            assert count == len(to_copy)
172
 
 
173
 
        self.add_multi(buffer_requests())
174
 
 
 
162
                        try:
 
163
                            copyfile(other_p+".gz", p+".gz")
 
164
                        except IOError, e:
 
165
                            if e.errno == errno.ENOENT:
 
166
                                failed.add(id)
 
167
                            else:
 
168
                                raise
 
169
                else:
 
170
                    raise
 
171
            
 
172
            count += 1
 
173
            pb.update('copy', count, len(to_copy))
 
174
        assert count == len(to_copy)
175
175
        pb.clear()
176
 
        return len(to_copy), failed
177
 
 
178
 
 
179
 
class TransportStore(Store):
180
 
    """A TransportStore is a Store superclass for Stores that use Transports."""
181
 
 
182
 
    _max_buffered_requests = 10
183
 
 
184
 
    def add(self, f, fileid, suffix=None):
185
 
        """Add contents of a file into the store.
186
 
 
187
 
        f -- A file-like object, or string
188
 
        """
189
 
        mutter("add store entry %r" % (fileid))
190
 
        
191
 
        if suffix is not None:
192
 
            fn = self._relpath(fileid, [suffix])
193
 
        else:
194
 
            fn = self._relpath(fileid)
195
 
        if self._transport.has(fn):
196
 
            raise BzrError("store %r already contains id %r" % (self._transport.base, fileid))
197
 
 
198
 
        if self._prefixed:
199
 
            try:
200
 
                self._transport.mkdir(hash_prefix(fileid))
201
 
            except errors.FileExists:
202
 
                pass
203
 
 
204
 
        self._add(fn, f)
205
 
 
206
 
    def _check_fileid(self, fileid):
207
 
        if not isinstance(fileid, basestring):
208
 
            raise TypeError('Fileids should be a string type: %s %r' % (type(fileid), fileid))
209
 
        if '\\' in fileid or '/' in fileid:
210
 
            raise ValueError("invalid store id %r" % fileid)
 
176
        return count, failed
 
177
    
211
178
 
212
179
    def __contains__(self, fileid):
213
 
        fn = self._relpath(fileid)
214
 
        return self._transport.has(fn)
215
 
 
216
 
    def _get(self, filename):
217
 
        """Return an vanilla file stream for clients to read from.
218
 
 
219
 
        This is the body of a template method on 'get', and should be 
220
 
        implemented by subclasses.
221
 
        """
222
 
        raise NotImplementedError
223
 
 
224
 
    def get(self, fileid):
 
180
        """"""
 
181
        p = self._path(fileid)
 
182
        return (os.access(p, os.R_OK)
 
183
                or os.access(p + '.gz', os.R_OK))
 
184
 
 
185
    # TODO: Guard against the same thing being stored twice, compressed and uncompresse
 
186
 
 
187
    def __iter__(self):
 
188
        for f in os.listdir(self._basedir):
 
189
            if f[-3:] == '.gz':
 
190
                # TODO: case-insensitive?
 
191
                yield f[:-3]
 
192
            else:
 
193
                yield f
 
194
 
 
195
    def __len__(self):
 
196
        return len(os.listdir(self._basedir))
 
197
 
 
198
 
 
199
    def __getitem__(self, fileid):
225
200
        """Returns a file reading from a particular entry."""
226
 
        fn = self._relpath(fileid)
227
 
        try:
228
 
            return self._get(fn)
229
 
        except errors.NoSuchFile:
230
 
            raise KeyError(fileid)
231
 
 
232
 
    def has(self, fileids, pb=None):
233
 
        """Return True/False for each entry in fileids.
234
 
 
235
 
        :param fileids: A List or generator yielding file ids.
236
 
        :return: A generator or list returning True/False for each entry.
237
 
        """
238
 
        relpaths = (self._relpath(fid) for fid in fileids)
239
 
        return self._transport.has_multi(relpaths, pb=pb)
240
 
 
241
 
    def __init__(self, transport, prefixed=False):
242
 
        assert isinstance(transport, bzrlib.transport.Transport)
243
 
        super(TransportStore, self).__init__()
244
 
        self._transport = transport
245
 
        self._prefixed = prefixed
246
 
 
247
 
    def __len__(self):
248
 
        return len(list(self._iter_relpath()))
249
 
 
250
 
    def _relpath(self, fileid, suffixes=[]):
251
 
        self._check_fileid(fileid)
252
 
        for suffix in suffixes:
253
 
            self._check_fileid(suffix)
254
 
        if self._prefixed:
255
 
            path = [hash_prefix(fileid) + fileid]
256
 
        else:
257
 
            path = [fileid]
258
 
        path.extend(suffixes)
259
 
        return '.'.join(path)
260
 
 
261
 
    def __repr__(self):
262
 
        if self._transport is None:
263
 
            return "%s(None)" % (self.__class__.__name__)
264
 
        else:
265
 
            return "%s(%r)" % (self.__class__.__name__, self._transport.base)
266
 
 
267
 
    __str__ = __repr__
268
 
 
269
 
    def _iter_relpaths(self):
270
 
        """Iter the relative paths of files in the transports sub-tree."""
271
 
        transport = self._transport
272
 
        queue = list(transport.list_dir('.'))
273
 
        while queue:
274
 
            relpath = queue.pop(0)
275
 
            st = transport.stat(relpath)
276
 
            if S_ISDIR(st[ST_MODE]):
277
 
                for i, basename in enumerate(transport.list_dir(relpath)):
278
 
                    queue.insert(i, relpath+'/'+basename)
279
 
            else:
280
 
                yield relpath, st
281
 
 
282
 
    def listable(self):
283
 
        """Return True if this store is able to be listed."""
284
 
        return self._transport.listable()
 
201
        p = self._path(fileid)
 
202
        try:
 
203
            return gzip.GzipFile(p + '.gz', 'rb')
 
204
        except IOError, e:
 
205
            if e.errno != errno.ENOENT:
 
206
                raise
 
207
 
 
208
        try:
 
209
            return file(p, 'rb')
 
210
        except IOError, e:
 
211
            if e.errno != errno.ENOENT:
 
212
                raise
 
213
 
 
214
        raise IndexError(fileid)
 
215
 
285
216
 
286
217
    def total_size(self):
287
218
        """Return (count, bytes)
290
221
        the content."""
291
222
        total = 0
292
223
        count = 0
293
 
        for relpath, st in self._iter_relpaths():
 
224
        for fid in self:
294
225
            count += 1
295
 
            total += st[ST_SIZE]
 
226
            p = self._path(fid)
 
227
            try:
 
228
                total += os.stat(p)[ST_SIZE]
 
229
            except OSError:
 
230
                total += os.stat(p + '.gz')[ST_SIZE]
296
231
                
297
232
        return count, total
298
233
 
299
234
 
300
 
class ImmutableMemoryStore(Store):
301
 
    """A memory only store."""
302
 
 
303
 
    def __contains__(self, fileid):
304
 
        return self._contents.has_key(fileid)
305
 
 
 
235
 
 
236
 
 
237
class ImmutableScratchStore(ImmutableStore):
 
238
    """Self-destructing test subclass of ImmutableStore.
 
239
 
 
240
    The Store only exists for the lifetime of the Python object.
 
241
 Obviously you should not put anything precious in it.
 
242
    """
306
243
    def __init__(self):
307
 
        super(ImmutableMemoryStore, self).__init__()
308
 
        self._contents = {}
309
 
 
310
 
    def add(self, stream, fileid, compressed=True):
311
 
        if self._contents.has_key(fileid):
312
 
            raise StoreError("fileid %s already in the store" % fileid)
313
 
        self._contents[fileid] = stream.read()
314
 
 
315
 
    def get(self, fileid):
316
 
        """Returns a file reading from a particular entry."""
317
 
        if not self._contents.has_key(fileid):
318
 
            raise IndexError
319
 
        return StringIO(self._contents[fileid])
320
 
 
321
 
    def _item_size(self, fileid):
322
 
        return len(self._contents[fileid])
323
 
 
324
 
    def __iter__(self):
325
 
        return iter(self._contents.keys())
326
 
 
327
 
    def total_size(self):
328
 
        result = 0
329
 
        count = 0
330
 
        for fileid in self:
331
 
            count += 1
332
 
            result += self._item_size(fileid)
333
 
        return count, result
334
 
        
335
 
 
336
 
class CachedStore(Store):
337
 
    """A store that caches data locally, to avoid repeated downloads.
338
 
    The precacache method should be used to avoid server round-trips for
339
 
    every piece of data.
340
 
    """
341
 
 
342
 
    def __init__(self, store, cache_dir):
343
 
        super(CachedStore, self).__init__()
344
 
        self.source_store = store
345
 
        # This clones the source store type with a locally bound
346
 
        # transport. FIXME: it assumes a constructor is == cloning.
347
 
        # clonable store - it might be nicer to actually have a clone()
348
 
        # or something. RBC 20051003
349
 
        self.cache_store = store.__class__(LocalTransport(cache_dir))
350
 
 
351
 
    def get(self, id):
352
 
        mutter("Cache add %s" % id)
353
 
        if id not in self.cache_store:
354
 
            self.cache_store.add(self.source_store.get(id), id)
355
 
        return self.cache_store.get(id)
356
 
 
357
 
    def __contains__(self, fileid):
358
 
        if fileid in self.cache_store:
359
 
            return True
360
 
        if fileid in self.source_store:
361
 
            # We could copy at this time
362
 
            return True
363
 
        return False
364
 
 
365
 
    def prefetch(self, ids):
366
 
        """Copy a series of ids into the cache, before they are used.
367
 
        For remote stores that support pipelining or async downloads, this can
368
 
        increase speed considerably.
369
 
 
370
 
        Failures while prefetching are ignored.
371
 
        """
372
 
        mutter("Prefetch of ids %s" % ",".join(ids))
373
 
        self.cache_store.copy_multi(self.source_store, ids, 
374
 
                                    permit_failure=True)
375
 
 
376
 
 
377
 
def copy_all(store_from, store_to):
378
 
    """Copy all ids from one store to another."""
379
 
    # TODO: Optional progress indicator
380
 
    if not store_from.listable():
381
 
        raise UnlistableStore(store_from)
382
 
    ids = [f for f in store_from]
383
 
    store_to.copy_multi(store_from, ids)
384
 
 
385
 
def hash_prefix(file_id):
386
 
    return "%02x/" % (adler32(file_id) & 0xff)
387
 
 
 
244
        ImmutableStore.__init__(self, tempfile.mkdtemp())
 
245
 
 
246
    def __del__(self):
 
247
        for f in os.listdir(self._basedir):
 
248
            fpath = os.path.join(self._basedir, f)
 
249
            # needed on windows, and maybe some other filesystems
 
250
            os.chmod(fpath, 0600)
 
251
            os.remove(fpath)
 
252
        os.rmdir(self._basedir)
 
253
        mutter("%r destroyed" % self)