from abc import abstractmethod, ABCMeta
import pickle
from typing import List, Any, Optional, Union
import cachetools
import numpy as np
import requests
from gptcache.manager.eviction import EvictionBase
from gptcache.utils.error import CacheError, ParamError
from gptcache.manager.scalar_data.base import (
CacheStorage,
CacheData,
DataType,
Answer,
Question
)
from gptcache.manager.vector_data.base import VectorBase, VectorData
from gptcache.manager.object_data.base import ObjectBase
from gptcache.manager.eviction_manager import EvictionManager
from gptcache.utils.log import gptcache_log
[docs]class DataManager(metaclass=ABCMeta):
"""DataManager manage the cache data, including save and search"""
@abstractmethod
def save(self, question, answer, embedding_data, **kwargs):
pass
@abstractmethod
def import_data(
self, questions: List[Any], answers: List[Any], embedding_datas: List[Any]
):
pass
@abstractmethod
def get_scalar_data(self, res_data, **kwargs) -> CacheData:
pass
def hit_cache_callback(self, res_data, **kwargs):
pass
@abstractmethod
def search(self, embedding_data, **kwargs):
pass
@abstractmethod
def close(self):
pass
[docs]class MapDataManager(DataManager):
"""MapDataManager, store all data in a map data structure.
:param data_path: the path to save the map data, defaults to 'data_map.txt'.
:type data_path: str
:param max_size: the max size for the cache, defaults to 1000.
:type max_size: int
:param get_data_container: a Callable to get the data container, defaults to None.
:type get_data_container: Callable
Example:
.. code-block:: python
from gptcache.manager import get_data_manager
data_manager = get_data_manager("data_map.txt", 1000)
"""
def __init__(self, data_path, max_size, get_data_container=None):
if get_data_container is None:
self.data = cachetools.LRUCache(max_size)
else:
self.data = get_data_container(max_size)
self.data_path = data_path
self.init()
def init(self):
try:
with open(self.data_path, "rb") as f:
self.data = pickle.load(f)
except FileNotFoundError:
return
except PermissionError:
raise CacheError( # pylint: disable=W0707
f"You don't have permission to access this file <{self.data_path}>."
)
def save(self, question, answer, embedding_data, **kwargs):
self.data[embedding_data] = (question, answer, embedding_data)
def import_data(
self, questions: List[Any], answers: List[Any], embedding_datas: List[Any]
):
if len(questions) != len(answers) or len(questions) != len(embedding_datas):
raise ParamError("Make sure that all parameters have the same length")
for i, embedding_data in enumerate(embedding_datas):
self.data[embedding_data] = (questions[i], answers[i], embedding_datas[i])
def get_scalar_data(self, res_data, **kwargs) -> CacheData:
return CacheData(question=res_data[0], answers=res_data[1])
def search(self, embedding_data, **kwargs):
try:
return [self.data[embedding_data]]
except KeyError:
return []
def close(self):
try:
with open(self.data_path, "wb") as f:
pickle.dump(self.data, f)
except PermissionError:
gptcache_log.error(
"You don't have permission to access this file %s.", self.data_path
)
def normalize(vec):
magnitude = np.linalg.norm(vec)
normalized_v = vec / magnitude
return normalized_v
[docs]class SSDataManager(DataManager):
"""Generate SSDataManage to manager the data.
:param s: CacheStorage to manager the scalar data, it can be generated with :meth:`gptcache.manager.CacheBase`.
:type s: CacheStorage
:param v: VectorBase to manager the vector data, it can be generated with :meth:`gptcache.manager.VectorBase`.
:type v: VectorBase
:param max_size: the max size for the cache, defaults to 1000.
:type max_size: int
:param clean_size: the size to clean up, defaults to `max_size * 0.2`.
:type clean_size: int
:param eviction: The eviction policy, it is support "LRU" and "FIFO" now, and defaults to "LRU".
:type eviction: str
"""
def __init__(
self,
s: CacheStorage,
v: VectorBase,
o: Optional[ObjectBase],
max_size,
clean_size,
policy="LRU",
):
self.max_size = max_size
self.clean_size = clean_size
self.s = s
self.v = v
self.o = o
self.eviction_base = EvictionBase(
name="memory",
policy=policy,
maxsize=max_size,
clean_size=clean_size,
on_evict=self._clear,
)
self.eviction_base.put(self.s.get_ids(deleted=False))
self.eviction_manager = EvictionManager(self.s, self.v)
def _clear(self, marked_keys):
self.eviction_manager.soft_evict(marked_keys)
if self.eviction_manager.check_evict():
self.eviction_manager.delete()
[docs] def save(self, question, answer, embedding_data, **kwargs):
"""Save the data and vectors to cache and vector storage.
:param question: question data.
:type question: str
:param answer: answer data.
:type answer: str, Answer or (Any, DataType)
:param embedding_data: vector data.
:type embedding_data: np.ndarray
Example:
.. code-block:: python
import numpy as np
from gptcache.manager import get_data_manager, CacheBase, VectorBase
data_manager = get_data_manager(CacheBase('sqlite'), VectorBase('faiss', dimension=128))
data_manager.save('hello', 'hi', np.random.random((128, )).astype('float32'))
"""
self.import_data([question], [answer], [embedding_data])
def _process_answer_data(self, answers: Union[Answer, List[Answer]]):
if isinstance(answers, Answer):
answers = [answers]
new_ans = []
for ans in answers:
if ans.answer_type != DataType.STR:
new_ans.append(Answer(self.o.put(ans.answer), ans.answer_type))
else:
new_ans.append(ans)
return new_ans
def _process_question_data(self, question: Union[str, Question]):
if isinstance(question, Question):
if question.deps is None:
return question
for dep in question.deps:
if dep.dep_type == DataType.IMAGE_URL:
dep.dep_type.data = self.o.put(requests.get(dep.data).content)
return question
return Question(question)
def import_data(
self, questions: List[Any], answers: List[Answer], embedding_datas: List[Any]
):
if len(questions) != len(answers) or len(questions) != len(embedding_datas):
raise ParamError("Make sure that all parameters have the same length")
cache_datas = []
embedding_datas = [
normalize(embedding_data) for embedding_data in embedding_datas
]
for i, embedding_data in enumerate(embedding_datas):
if self.o is not None:
ans = self._process_answer_data(answers[i])
else:
ans = answers[i]
cache_datas.append(
CacheData(
question=self._process_question_data(questions[i]),
answers=ans,
embedding_data=embedding_data.astype("float32"),
)
)
ids = self.s.batch_insert(cache_datas)
self.v.mul_add(
[
VectorData(id=ids[i], data=embedding_data)
for i, embedding_data in enumerate(embedding_datas)
]
)
self.eviction_base.put(ids)
def get_scalar_data(self, res_data, **kwargs) -> CacheData:
cache_data = self.s.get_data_by_id(res_data[1])
for ans in cache_data.answers:
if ans.answer_type != DataType.STR:
ans.answer = self.o.get(ans.answer)
return cache_data
def hit_cache_callback(self, res_data, **kwargs):
self.eviction_base.get(res_data[1])
def search(self, embedding_data, **kwargs):
embedding_data = normalize(embedding_data)
top_k = kwargs.get("top_k", -1)
return self.v.search(data=embedding_data, top_k=top_k)
def close(self):
self.s.close()
self.v.close()