Hamza Amri
Hamza Amri

Reputation: 21

Updating Data in PGVector While Inserting New Data

I'm currently using PostgreSQL's PGVector and need assistance with efficiently managing data insertion. My goal is to store data in the vector store while ensuring two conditions are met:

  1. Skip inserting data that already exists.
  2. Update data for existing IDs.

Here's a snippet of my code:

cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'langchain_pg_embedding')")
table_exists = cursor.fetchone()[0]

if not table_exists:
    print("Vectorstore does not exist in the database.")
    print("Creating Database ...")

    db = PGVector.from_documents(
        embedding=embeddings,
        documents=chunks,
        collection_name=COLLECTION_NAME,
        connection_string=CONNECTION_STRING
    )
    print("Database created successfully")
else:
    print("Vectorstore already exists in the database.")
    print("Checking data ...")

    # Check if the ID already exists in the database
    for chunk in chunks:
        cursor.execute("SELECT * FROM langchain_pg_embedding WHERE langchain_pg_embedding.cmetadata ->> 'id' = %s", (chunk.metadata["id"],))
        result = cursor.fetchall()

        if result:
            print(f"ID {chunk.metadata['id']} already exists in the database.")
            print(result)        

        else:
            print(f"Inserting ID {chunk.metadata['id']} into the database.")
            # Insert the chunk into the database
            db = PGVector.from_documents(
                embedding=embeddings,
                documents=[chunk],
                collection_name=COLLECTION_NAME,
                connection_string=CONNECTION_STRING
            )

Here's a snippet of my code:

cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'langchain_pg_embedding')")
table_exists = cursor.fetchone()[0]

if not table_exists:
    print("Vectorstore does not exist in the database.")
    print("Creating Database ...")

    db = PGVector.from_documents(
        embedding=embeddings,
        documents=chunks,
        collection_name=COLLECTION_NAME,
        connection_string=CONNECTION_STRING
    )
    print("Database created successfully")
else:
    print("Vectorstore already exists in the database.")
    print("Checking data ...")

    # Check if the ID already exists in the database
    for chunk in chunks:
        cursor.execute("SELECT * FROM langchain_pg_embedding WHERE langchain_pg_embedding.cmetadata ->> 'id' = %s", (chunk.metadata["id"],))
        result = cursor.fetchall()

        if result:
            print(f"ID {chunk.metadata['id']} already exists in the database.")
            print(result)        

        else:
            print(f"Inserting ID {chunk.metadata['id']} into the database.")
            # Insert the chunk into the database
            db = PGVector.from_documents(
                embedding=embeddings,
                documents=[chunk],
                collection_name=COLLECTION_NAME,
                connection_string=CONNECTION_STRING
            )

Upvotes: 1

Views: 1154

Answers (1)

mike00
mike00

Reputation: 448

I don't have solution for your question but what I'm using for that upsert operation is index:

from langchain.indexes import SQLRecordManager, index

CONNECTION_STRING = "postgresql+psycopg2://admin:[email protected]:9432/vectordb"
COLLECTION_NAME = "vectordb"

namespace = f"pgvector/{COLLECTION_NAME}"
record_manager = SQLRecordManager(
    namespace, db_url=CONNECTION_STRING
)

record_manager.create_schema()

vectorstore = PGVector.from_documents(
    docs,
    embeddings,
    collection_name=COLLECTION_NAME,
    connection_string=CONNECTION_STRING,
)

index(docs, record_manager, vectorstore, cleanup="incremental", source_id_key="source"))

After you call index, it will check all embeddings with metadata source as key, and if already are in db, it will delete them and insert new ones. You can play with different options like 'incremental' clean up or 'full' etc.

Upvotes: 1

Related Questions