Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import contextlib | |
| import enum | |
| import json | |
| import logging | |
| import struct | |
| from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple, Type | |
| import pandas as pd | |
| import sqlalchemy | |
| from langchain.docstore.document import Document | |
| from langchain.schema.embeddings import Embeddings | |
| from langchain.utils import get_from_dict_or_env | |
| from langchain.vectorstores.base import VectorStore | |
| from pgvector.sqlalchemy import Vector | |
| from sqlalchemy import delete, text | |
| from sqlalchemy.orm import Session, declarative_base | |
| class DistanceStrategy(str, enum.Enum): | |
| """Enumerator of the Distance strategies.""" | |
| EUCLIDEAN = "l2" | |
| COSINE = "cosine" | |
| MAX_INNER_PRODUCT = "inner" | |
| DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.COSINE | |
| Base = declarative_base() # type: Any | |
| _LANGCHAIN_DEFAULT_COLLECTION_NAME = "langchain" | |
| def _results_to_docs(docs_and_scores: Any) -> List[Document]: | |
| """Return docs from docs and scores.""" | |
| return [doc for doc, _ in docs_and_scores] | |
| class Article(Base): | |
| """Embedding store.""" | |
| __tablename__ = "article" | |
| id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, nullable=False) | |
| title = sqlalchemy.Column(sqlalchemy.String, nullable=True) | |
| abstract = sqlalchemy.Column(sqlalchemy.String, nullable=True) | |
| embedding: Vector = sqlalchemy.Column("abstract_embedding", Vector(None)) | |
| doi = sqlalchemy.Column(sqlalchemy.String, nullable=True) | |
| class CustomPGVector(VectorStore): | |
| """`Postgres`/`PGVector` vector store. | |
| To use, you should have the ``pgvector`` python package installed. | |
| Args: | |
| connection: Postgres connection string. | |
| embedding_function: Any embedding function implementing | |
| `langchain.embeddings.base.Embeddings` interface. | |
| table_name: The name of the collection to use. (default: langchain) | |
| NOTE: This is not the name of the table, but the name of the collection. | |
| The tables will be created when initializing the store (if not exists) | |
| So, make sure the user has the right permissions to create tables. | |
| distance_strategy: The distance strategy to use. (default: COSINE) | |
| pre_delete_collection: If True, will delete the collection if it exists. | |
| (default: False). Useful for testing. | |
| Example: | |
| .. code-block:: python | |
| from langchain.vectorstores import PGVector | |
| from langchain.embeddings.openai import OpenAIEmbeddings | |
| COLLECTION_NAME = "state_of_the_union_test" | |
| embeddings = OpenAIEmbeddings() | |
| vectorestore = PGVector.from_documents( | |
| embedding=embeddings, | |
| documents=docs, | |
| table_name=COLLECTION_NAME, | |
| connection=connection, | |
| ) | |
| """ | |
| def __init__( | |
| self, | |
| connection: sqlalchemy.engine.Connection, | |
| embedding_function: Embeddings, | |
| table_name: str, | |
| column_name: str, | |
| collection_metadata: Optional[dict] = None, | |
| distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, | |
| pre_delete_collection: bool = False, | |
| logger: Optional[logging.Logger] = None, | |
| ) -> None: | |
| self._conn = connection | |
| self.embedding_function = embedding_function | |
| self.table_name = table_name | |
| self.column_name = column_name | |
| self.collection_metadata = collection_metadata | |
| self._distance_strategy = distance_strategy | |
| self.pre_delete_collection = pre_delete_collection | |
| self.logger = logger or logging.getLogger(__name__) | |
| self.__post_init__() | |
| def __post_init__( | |
| self, | |
| ) -> None: | |
| """ | |
| Initialize the store. | |
| """ | |
| # self._conn = self.connect() | |
| self.EmbeddingStore = Article | |
| def embeddings(self) -> Embeddings: | |
| return self.embedding_function | |
| def drop_tables(self) -> None: | |
| with self._conn.begin(): | |
| Base.metadata.drop_all(self._conn) | |
| def _make_session(self) -> Generator[Session, None, None]: | |
| """Create a context manager for the session, bind to _conn string.""" | |
| yield Session(self._conn) | |
| def delete( | |
| self, | |
| ids: Optional[List[str]] = None, | |
| **kwargs: Any, | |
| ) -> None: | |
| """Delete vectors by ids. | |
| Args: | |
| ids: List of ids to delete. | |
| """ | |
| with Session(self._conn) as session: | |
| if ids is not None: | |
| self.logger.debug( | |
| "Trying to delete vectors by ids (represented by the model " | |
| "using the custom ids field)" | |
| ) | |
| stmt = delete(self.EmbeddingStore).where( | |
| self.EmbeddingStore.custom_id.in_(ids) | |
| ) | |
| session.execute(stmt) | |
| session.commit() | |
| def __from( | |
| cls, | |
| texts: List[str], | |
| embeddings: List[List[float]], | |
| embedding: Embeddings, | |
| metadatas: Optional[List[dict]] = None, | |
| ids: Optional[List[str]] = None, | |
| table_name: str = "article", | |
| distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, | |
| connection_string: Optional[str] = None, | |
| pre_delete_collection: bool = False, | |
| **kwargs: Any, | |
| ) -> CustomPGVector: | |
| if not metadatas: | |
| metadatas = [{} for _ in texts] | |
| if connection_string is None: | |
| connection_string = cls.get_connection_string(kwargs) | |
| store = cls( | |
| connection_string=connection_string, | |
| table_name=table_name, | |
| embedding_function=embedding, | |
| distance_strategy=distance_strategy, | |
| pre_delete_collection=pre_delete_collection, | |
| **kwargs, | |
| ) | |
| store.add_embeddings( | |
| texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs | |
| ) | |
| return store | |
| def add_embeddings( | |
| self, | |
| texts: Iterable[str], | |
| embeddings: List[List[float]], | |
| metadatas: Optional[List[dict]] = None, | |
| ids: Optional[List[str]] = None, | |
| **kwargs: Any, | |
| ) -> List[str]: | |
| """Add embeddings to the vectorstore. | |
| Args: | |
| texts: Iterable of strings to add to the vectorstore. | |
| embeddings: List of list of embedding vectors. | |
| metadatas: List of metadatas associated with the texts. | |
| kwargs: vectorstore specific parameters | |
| """ | |
| if not metadatas: | |
| metadatas = [{} for _ in texts] | |
| with Session(self._conn) as session: | |
| # collection = self.get_collection(session) | |
| # if not collection: | |
| # raise ValueError("Collection not found") | |
| for text, metadata, embedding, id in zip(texts, metadatas, embeddings, ids): | |
| embedding_store = self.EmbeddingStore( | |
| embedding=embedding, | |
| document=text, | |
| cmetadata=metadata, | |
| custom_id=id, | |
| ) | |
| session.add(embedding_store) | |
| session.commit() | |
| return ids | |
| def add_texts( | |
| self, | |
| texts: Iterable[str], | |
| metadatas: Optional[List[dict]] = None, | |
| ids: Optional[List[str]] = None, | |
| **kwargs: Any, | |
| ) -> List[str]: | |
| """Run more texts through the embeddings and add to the vectorstore. | |
| Args: | |
| texts: Iterable of strings to add to the vectorstore. | |
| metadatas: Optional list of metadatas associated with the texts. | |
| kwargs: vectorstore specific parameters | |
| Returns: | |
| List of ids from adding the texts into the vectorstore. | |
| """ | |
| embeddings = self.embedding_function.embed_documents(list(texts)) | |
| return self.add_embeddings( | |
| texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs | |
| ) | |
| def similarity_search( | |
| self, | |
| query: str, | |
| k: int = 4, | |
| filter: Optional[dict] = None, | |
| **kwargs: Any, | |
| ) -> List[Document]: | |
| """Run similarity search with PGVector with distance. | |
| Args: | |
| query (str): Query text to search for. | |
| k (int): Number of results to return. Defaults to 4. | |
| filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. | |
| Returns: | |
| List of Documents most similar to the query. | |
| """ | |
| embedding = self.embedding_function.embed_query(text=query) | |
| return self.similarity_search_by_vector( | |
| embedding=embedding, | |
| k=k, | |
| ) | |
| def similarity_search_with_score( | |
| self, | |
| query: str, | |
| k: int = 4, | |
| filter: Optional[dict] = None, | |
| ) -> List[Tuple[Document, float]]: | |
| """Return docs most similar to query. | |
| Args: | |
| query: Text to look up documents similar to. | |
| k: Number of Documents to return. Defaults to 4. | |
| filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. | |
| Returns: | |
| List of Documents most similar to the query and score for each. | |
| """ | |
| embedding = self.embedding_function.embed_query(query) | |
| docs = self.similarity_search_with_score_by_vector(embedding=embedding, k=k) | |
| return docs | |
| def distance_strategy(self) -> Any: | |
| if self._distance_strategy == DistanceStrategy.EUCLIDEAN: | |
| return self.EmbeddingStore.embedding.l2_distance | |
| elif self._distance_strategy == DistanceStrategy.COSINE: | |
| return self.EmbeddingStore.embedding.cosine_distance | |
| elif self._distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: | |
| return self.EmbeddingStore.embedding.max_inner_product | |
| else: | |
| raise ValueError( | |
| f"Got unexpected value for distance: {self._distance_strategy}. " | |
| f"Should be one of {', '.join([ds.value for ds in DistanceStrategy])}." | |
| ) | |
| def similarity_search_with_score_by_vector( | |
| self, | |
| embedding: List[float], | |
| k: int = 4, | |
| ) -> List[Tuple[Document, float]]: | |
| results = self.__query_collection(embedding=embedding, k=k) | |
| return self._results_to_docs_and_scores(results) | |
| def _results_to_docs_and_scores(self, results: Any) -> List[Tuple[Document, float]]: | |
| """Return docs and scores from results.""" | |
| docs = [ | |
| ( | |
| Document( | |
| page_content=json.dumps( | |
| { | |
| "abstract": result["abstract"], | |
| "id": result["id"], | |
| "title": result["title"], | |
| "authors": result["authors"], | |
| "doi": result["doi"], | |
| # "halID": result["halID"], | |
| "keywords": result["keywords"], | |
| "distance": result["distance"], | |
| } | |
| ), | |
| ), | |
| result["distance"] if self.embedding_function is not None else None, | |
| ) | |
| for result in results | |
| ] | |
| return docs | |
| def __query_collection( | |
| self, | |
| embedding: List[float], | |
| k: int = 4, | |
| ) -> List[Any]: | |
| """Query the collection.""" | |
| vector = bytearray(struct.pack("f" * len(embedding), *embedding)) | |
| cursor = self._conn.execute( | |
| text(""" | |
| with matches as ( | |
| select | |
| rowid, | |
| distance | |
| from vss_article | |
| where vss_search( | |
| abstract_embedding, | |
| :vector | |
| ) | |
| limit 5 | |
| ) | |
| select | |
| article.id, | |
| article.title, | |
| article.doi, | |
| article.abstract, | |
| group_concat(keyword."name", ',') as keywords, | |
| group_concat(author."name", ',') as authors, | |
| matches.distance | |
| from matches | |
| left join article on matches.rowid = article.rowid | |
| left join article_keyword ak ON ak.article_id = article.id | |
| left join keyword on ak.keyword_id = keyword.id | |
| left join article_author ON article_author.article_id = article.id | |
| left join author on author.id = article_author.author_id | |
| group by article.id | |
| order by distance; | |
| """), | |
| {"vector": vector, "limit": k} | |
| ) | |
| results = cursor.fetchall() | |
| results = pd.DataFrame( | |
| results, | |
| columns=[ | |
| "id", | |
| "title", | |
| "doi", | |
| "abstract", | |
| "keywords", | |
| "authors", | |
| "distance", | |
| ], | |
| ) | |
| results = results.to_dict(orient="records") | |
| return results | |
| def similarity_search_by_vector( | |
| self, | |
| embedding: List[float], | |
| k: int = 4, | |
| **kwargs: Any, | |
| ) -> List[Document]: | |
| """Return docs most similar to embedding vector. | |
| Args: | |
| embedding: Embedding to look up documents similar to. | |
| k: Number of Documents to return. Defaults to 4. | |
| filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. | |
| Returns: | |
| List of Documents most similar to the query vector. | |
| """ | |
| docs_and_scores = self.similarity_search_with_score_by_vector( | |
| embedding=embedding, k=k | |
| ) | |
| return _results_to_docs(docs_and_scores) | |
| def from_texts( | |
| cls: Type[PGVector], | |
| texts: List[str], | |
| embedding: Embeddings, | |
| metadatas: Optional[List[dict]] = None, | |
| table_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, | |
| distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, | |
| ids: Optional[List[str]] = None, | |
| pre_delete_collection: bool = False, | |
| **kwargs: Any, | |
| ) -> PGVector: | |
| """ | |
| Return VectorStore initialized from texts and embeddings. | |
| Postgres connection string is required | |
| "Either pass it as a parameter | |
| or set the PGVECTOR_CONNECTION_STRING environment variable. | |
| """ | |
| embeddings = embedding.embed_documents(list(texts)) | |
| return cls.__from( | |
| texts, | |
| embeddings, | |
| embedding, | |
| metadatas=metadatas, | |
| ids=ids, | |
| table_name=table_name, | |
| distance_strategy=distance_strategy, | |
| pre_delete_collection=pre_delete_collection, | |
| **kwargs, | |
| ) | |
| def from_embeddings( | |
| cls, | |
| text_embeddings: List[Tuple[str, List[float]]], | |
| embedding: Embeddings, | |
| metadatas: Optional[List[dict]] = None, | |
| table_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, | |
| distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, | |
| ids: Optional[List[str]] = None, | |
| pre_delete_collection: bool = False, | |
| **kwargs: Any, | |
| ) -> PGVector: | |
| """Construct PGVector wrapper from raw documents and pre- | |
| generated embeddings. | |
| Return VectorStore initialized from documents and embeddings. | |
| Postgres connection string is required | |
| "Either pass it as a parameter | |
| or set the PGVECTOR_CONNECTION_STRING environment variable. | |
| Example: | |
| .. code-block:: python | |
| from langchain.vectorstores import PGVector | |
| from langchain.embeddings import OpenAIEmbeddings | |
| embeddings = OpenAIEmbeddings() | |
| text_embeddings = embeddings.embed_documents(texts) | |
| text_embedding_pairs = list(zip(texts, text_embeddings)) | |
| faiss = PGVector.from_embeddings(text_embedding_pairs, embeddings) | |
| """ | |
| texts = [t[0] for t in text_embeddings] | |
| embeddings = [t[1] for t in text_embeddings] | |
| return cls.__from( | |
| texts, | |
| embeddings, | |
| embedding, | |
| metadatas=metadatas, | |
| ids=ids, | |
| table_name=table_name, | |
| distance_strategy=distance_strategy, | |
| pre_delete_collection=pre_delete_collection, | |
| **kwargs, | |
| ) | |
| def from_existing_index( | |
| cls: Type[PGVector], | |
| embedding: Embeddings, | |
| table_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, | |
| distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, | |
| pre_delete_collection: bool = False, | |
| **kwargs: Any, | |
| ) -> PGVector: | |
| """ | |
| Get intsance of an existing PGVector store.This method will | |
| return the instance of the store without inserting any new | |
| embeddings | |
| """ | |
| connection_string = cls.get_connection_string(kwargs) | |
| store = cls( | |
| connection_string=connection_string, | |
| table_name=table_name, | |
| embedding_function=embedding, | |
| distance_strategy=distance_strategy, | |
| pre_delete_collection=pre_delete_collection, | |
| ) | |
| return store | |
| def get_connection_string(cls, kwargs: Dict[str, Any]) -> str: | |
| connection_string: str = get_from_dict_or_env( | |
| data=kwargs, | |
| key="connection_string", | |
| env_key="PGVECTOR_CONNECTION_STRING", | |
| ) | |
| if not connection_string: | |
| raise ValueError( | |
| "Postgres connection string is required" | |
| "Either pass it as a parameter" | |
| "or set the PGVECTOR_CONNECTION_STRING environment variable." | |
| ) | |
| return connection_string | |
| def from_documents( | |
| cls: Type[CustomPGVector], | |
| documents: List[Document], | |
| embedding: Embeddings, | |
| table_name: str = "article", | |
| column_name: str = "embeding", | |
| distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, | |
| ids: Optional[List[str]] = None, | |
| pre_delete_collection: bool = False, | |
| **kwargs: Any, | |
| ) -> CustomPGVector: | |
| """ | |
| Return VectorStore initialized from documents and embeddings. | |
| Postgres connection string is required | |
| "Either pass it as a parameter | |
| or set the PGVECTOR_CONNECTION_STRING environment variable. | |
| """ | |
| texts = [d.page_content for d in documents] | |
| metadatas = [d.metadata for d in documents] | |
| connection_string = cls.get_connection_string(kwargs) | |
| kwargs["connection_string"] = connection_string | |
| return cls.from_texts( | |
| texts=texts, | |
| pre_delete_collection=pre_delete_collection, | |
| embedding=embedding, | |
| distance_strategy=distance_strategy, | |
| metadatas=metadatas, | |
| ids=ids, | |
| table_name=table_name, | |
| column_name=column_name, | |
| **kwargs, | |
| ) | |