Source code for CADETProcess.optimization.cache
from collections import defaultdict
from pathlib import Path
import shutil
import tempfile
from diskcache import Cache
from CADETProcess import CADETProcessError
from CADETProcess.dataStructure import DillDisk
[docs]
class ResultsCache():
"""Cache to store (intermediate) results.
Optinally uses diskcache library to store large objects in sqlite database.
Internal structure:
[evaluation_object][step][x]
For example:
[EvaluationObject 1][Evaluator 1][x] -> IntermediateResults 1
[EvaluationObject 1][Evaluator 2][x] -> IntermediateResults 2
[EvaluationObject 1][Objective 1][x] -> f1.1
[EvaluationObject 1][Objective 2][x] -> f1.2
[EvaluationObject 1][Constraint 1][x] -> g1.1
[EvaluationObject 2][Evaluator 1][x] -> IntermediateResults 1
[EvaluationObject 2][Evaluator 2][x] -> IntermediateResults 2
[EvaluationObject 2][Objective 1][x] -> f2.1
[EvaluationObject 2][Objective 2][x] -> f2.2
[EvaluationObject 2][Constraint 1][x] -> g2.1
[None][Evaluator 1][x] -> IntermediateResults 1
[Objective 3][x] -> f3
[Constraint 2][x] -> g2
See Also
--------
CADETProcess.optimization.OptimizationProblem.add_evaluator
"""
def __init__(self, use_diskcache=False, directory=None):
self.use_diskcache = use_diskcache
self.directory = directory
self.init_cache()
self.tags = defaultdict(list)
[docs]
def init_cache(self):
"""Initialize ResultsCache."""
if self.use_diskcache:
if self.directory is None:
self.directory = Path(tempfile.mkdtemp(prefix='diskcache-'))
if self.directory.exists():
shutil.rmtree(self.directory, ignore_errors=True)
self.directory.mkdir(exist_ok=True, parents=True)
self.cache = Cache(
self.directory.as_posix(),
disk=DillDisk,
disk_min_file_size=2**18, # 256 kb
size_limit=2**36, # 64 GB
tag_index=True,
)
self.directory = self.cache.directory
else:
self.cache = {}
[docs]
def set(self, key, value, tag=None, close=True):
"""Add entry to cache.
Parameters
----------
key : hashable
The key to retrieve the results for.
value : object
The value corresponding to the key.
tag : str, optional
Tag to associate with result. The default is None.
"""
if self.use_diskcache:
self.cache.set(key, value, tag=tag, expire=None)
else:
if tag is not None:
self.tags[tag].append(key)
self.cache[key] = value
if close:
self.close()
[docs]
def get(self, key, close=True):
"""Get entry from cache.
Parameters
----------
key : hashable
The key to retrieve the results for.
Returns
-------
value : object
The value corresponding to the key.
"""
value = self.cache[key]
if close:
self.close()
return value
[docs]
def delete(self, key, close=True):
"""Remove entry from cache.
Parameters
----------
key : hashable
The key to retrieve the results for.
value : object
The value corresponding to the key.
"""
if self.use_diskcache:
found = self.cache.delete(key)
if not found:
raise KeyError(key)
else:
self.cache.pop(key)
if close:
self.close()
[docs]
def prune(self, tag):
"""Remove tagged entries from cache.
Parameters
----------
tag : str, optional
Tag to be removed. The default is 'temp'.
"""
if self.use_diskcache:
self.cache.evict(tag)
else:
keys = self.tags.pop(tag, [])
for key in keys:
try:
self.delete(key, close=False)
except KeyError:
pass
self.close()
[docs]
def close(self):
"""Close cache."""
if self.use_diskcache:
self.cache.close()
[docs]
def delete_database(self, reinit=False):
"""Delte database.
Parameters
----------
reinit : bool, optional
If True, reinitialize cache. The default is False.
"""
self.close()
if self.use_diskcache:
try:
shutil.rmtree(self.directory, ignore_errors=True)
except FileNotFoundError:
pass
if reinit:
self.init_cache()