Hybrid Search with FastEmbed & Qdrant
Author: Nirant Kasliwal
What will we do?
This notebook demonstrates the usage of Hybrid Search with FastEmbed & Qdrant.
- Setup: Download and install the required dependencies
- Preview data: Load and preview the data
- Create Sparse Embeddings: Create SPLADE++ embeddings for the data
- Create Dense Embeddings: Create BGE-Large-en-v1.5 embeddings for the data
- Indexing: Index the embeddings using Qdrant
- Search: Perform Hybrid Search using FastEmbed & Qdrant
- Ranking: Rank the search results with Reciprocal Rank Fusion (RRF)
Setup
In order to get started, you need a few dependencies, and we'll install them next:
!pip install -qU qdrant-client fastembed datasets transformers
import json
from typing import List, Tuple
import numpy as np
import pandas as pd
from datasets import load_dataset
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance,
NamedSparseVector,
NamedVector,
SparseVector,
PointStruct,
SearchRequest,
SparseIndexParams,
SparseVectorParams,
VectorParams,
ScoredPoint,
)
from transformers import AutoTokenizer
import fastembed
from fastembed import SparseEmbedding, SparseTextEmbedding, TextEmbedding
fastembed.__version__ # 0.2.5
dataset = load_dataset("tasksource/esci", split="train")
# We'll select the first 1000 examples for this demo
dataset = dataset.select(range(1000))
dataset = dataset.filter(lambda x: x["product_locale"] == "us")
dataset
Preview Data
source_df = dataset.to_pandas()
df = source_df.drop_duplicates(
subset=["product_text", "product_title", "product_bullet_point", "product_brand"]
)
df = df.dropna(subset=["product_text", "product_title", "product_bullet_point", "product_brand"])
df.head()
print(f"Catalog Item Count: {len(df)}\nQueries: {len(source_df)}")
df["combined_text"] = (
df["product_title"] + "\n" + df["product_text"] + "\n" + df["product_bullet_point"]
)
len(df)
Create Sparse Embeddings
sparse_model_name = "prithvida/Splade_PP_en_v1"
dense_model_name = "BAAI/bge-large-en-v1.5"
# This triggers the model download
sparse_model = SparseTextEmbedding(model_name=sparse_model_name, batch_size=32)
dense_model = TextEmbedding(model_name=dense_model_name, batch_size=32)
def make_sparse_embedding(texts: List[str]):
return list(sparse_model.embed(texts, batch_size=32))
sparse_embedding: List[SparseEmbedding] = make_sparse_embedding(
["Fastembed is a great library for text embeddings!"]
)
sparse_embedding
The previous output is a SparseEmbedding object for the first document in our list.
It contains two arrays: values and indices. - The 'values' array represents the weights of the features (tokens) in the document. - The 'indices' array represents the indices of these features in the model's vocabulary.
Each pair of corresponding values and indices represents a token and its weight in the document.
This is still a little abstract, so let's use the tokenizer vocab to make sense of these indices.
SparseTextEmbedding.list_supported_models()
def get_tokens_and_weights(sparse_embedding, model_name):
# Find the tokenizer for the model
tokenizer_source = None
for model_info in SparseTextEmbedding.list_supported_models():
if model_info["model"].lower() == model_name.lower():
tokenizer_source = model_info["sources"]["hf"]
break
else:
raise ValueError(f"Model {model_name} not found in the supported models.")
tokenizer = AutoTokenizer.from_pretrained(tokenizer_source)
token_weight_dict = {}
for i in range(len(sparse_embedding.indices)):
token = tokenizer.decode([sparse_embedding.indices[i]])
weight = sparse_embedding.values[i]
token_weight_dict[token] = weight
# Sort the dictionary by weights
token_weight_dict = dict(
sorted(token_weight_dict.items(), key=lambda item: item[1], reverse=True)
)
return token_weight_dict
# Test the function with the first SparseEmbedding
print(json.dumps(get_tokens_and_weights(sparse_embedding[0], sparse_model_name), indent=4))
Create Dense Embeddings
def make_dense_embedding(texts: List[str]):
return list(dense_model.embed(texts))
dense_embedding = make_dense_embedding(["Fastembed is a great library for text embeddings!"])
dense_embedding[0].shape
product_texts = df["combined_text"].tolist()
%%time
df["sparse_embedding"] = make_sparse_embedding(product_texts)
Notice that FastEmbed uses data parallelism to speed up the embedding generation process.
This improves throughput and reduces the time it takes to generate embeddings for large datasets.
For our small dataset here, on my local machine -- it reduces the time from user's 6 min 15 seconds to a wall time of about 3 min 6 seconds, or about 2x faster. This is a function of the number of CPU cores available on the machine, CPU usage and other factors -- so your mileage may vary.
df["sparse_embedding"]
%%time
df["dense_embedding"] = make_dense_embedding(product_texts)
Indexing
client = QdrantClient(":memory:")
About Qdrant
Qdrant is a vector similarity search engine that allows you to index and search high-dimensional vectors. It supports both sparse and dense embeddings, and it's a great tool for building search engines.
Here, we use the memory mode which is Numpy under the hood for demonstration purposes. In production, you can use the Docker or Cloud for full DB support.
collection_name = "esci"
client.create_collection(
collection_name,
vectors_config={
"text-dense": VectorParams(
size=1024, # OpenAI Embeddings
distance=Distance.COSINE,
)
},
sparse_vectors_config={
"text-sparse": SparseVectorParams(
index=SparseIndexParams(
on_disk=False,
)
)
},
)
def make_points(df: pd.DataFrame) -> List[PointStruct]:
sparse_vectors = df["sparse_embedding"].tolist()
product_texts = df["combined_text"].tolist()
dense_vectors = df["dense_embedding"].tolist()
rows = df.to_dict(orient="records")
points = []
for idx, (text, sparse_vector, dense_vector) in enumerate(
zip(product_texts, sparse_vectors, dense_vectors)
):
sparse_vector = SparseVector(
indices=sparse_vector.indices.tolist(), values=sparse_vector.values.tolist()
)
point = PointStruct(
id=idx,
payload={
"text": text,
"product_id": rows[idx]["product_id"],
}, # Add any additional payload if necessary
vector={
"text-sparse": sparse_vector,
"text-dense": dense_vector.tolist(),
},
)
points.append(point)
return points
points: List[PointStruct] = make_points(df)
client.upsert(collection_name, points)
Search
def search(query_text: str):
# # Compute sparse and dense vectors
query_sparse_vectors: List[SparseEmbedding] = make_sparse_embedding([query_text])
query_dense_vector: List[np.ndarray] = make_dense_embedding([query_text])
search_results = client.search_batch(
collection_name=collection_name,
requests=[
SearchRequest(
vector=NamedVector(
name="text-dense",
vector=query_dense_vector[0].tolist(),
),
limit=10,
with_payload=True,
),
SearchRequest(
vector=NamedSparseVector(
name="text-sparse",
vector=SparseVector(
indices=query_sparse_vectors[0].indices.tolist(),
values=query_sparse_vectors[0].values.tolist(),
),
),
limit=10,
with_payload=True,
),
],
)
return search_results
query_text = " revent 80 cfm"
search_results = search(query_text)
Ranking
We'll combine the results from the two models using Reciprocal Rank Fusion (RRF). You can read more about RRF here.
We select RRF for this task because: 1. It is a simple and effective method for combining search results. 2. It is robust to the differences in the ranking scores of the two or more ranking lists. 3. It is easy to implement and requires minimal tuning (only one parameter: alpha, which we don't tune here).
def rrf(rank_lists, alpha=60, default_rank=1000):
"""
Optimized Reciprocal Rank Fusion (RRF) using NumPy for large rank lists.
:param rank_lists: A list of rank lists. Each rank list should be a list of (item, rank) tuples.
:param alpha: The parameter alpha used in the RRF formula. Default is 60.
:param default_rank: The default rank assigned to items not present in a rank list. Default is 1000.
:return: Sorted list of items based on their RRF scores.
"""
# Consolidate all unique items from all rank lists
all_items = set(item for rank_list in rank_lists for item, _ in rank_list)
# Create a mapping of items to indices
item_to_index = {item: idx for idx, item in enumerate(all_items)}
# Initialize a matrix to hold the ranks, filled with the default rank
rank_matrix = np.full((len(all_items), len(rank_lists)), default_rank)
# Fill in the actual ranks from the rank lists
for list_idx, rank_list in enumerate(rank_lists):
for item, rank in rank_list:
rank_matrix[item_to_index[item], list_idx] = rank
# Calculate RRF scores using NumPy operations
rrf_scores = np.sum(1.0 / (alpha + rank_matrix), axis=1)
# Sort items based on RRF scores
sorted_indices = np.argsort(-rrf_scores) # Negative for descending order
# Retrieve sorted items
sorted_items = [(list(item_to_index.keys())[idx], rrf_scores[idx]) for idx in sorted_indices]
return sorted_items
# Example usage
rank_list1 = [("A", 1), ("B", 2), ("C", 3)]
rank_list2 = [("B", 1), ("C", 2), ("D", 3)]
rank_list3 = [("A", 2), ("D", 1), ("E", 3)]
# Combine the rank lists
sorted_items = rrf([rank_list1, rank_list2, rank_list3])
sorted_items
Based on this, let's convert our sparse and dense results into rank lists. And then, we'll use the Reciprocal Rank Fusion (RRF) algorithm to combine them.
def rank_list(search_result: List[ScoredPoint]):
return [(point.id, rank + 1) for rank, point in enumerate(search_result)]
dense_rank_list, sparse_rank_list = rank_list(search_results[0]), rank_list(search_results[1])
rrf_rank_list = rrf([dense_rank_list, sparse_rank_list])
rrf_rank_list
def find_point_by_id(
client: QdrantClient, collection_name: str, rrf_rank_list: List[Tuple[int, float]]
):
return client.retrieve(
collection_name=collection_name, ids=[item[0] for item in rrf_rank_list]
)
find_point_by_id(client, collection_name, rrf_rank_list)
Next, let's check the ESCI (Exact, Substitute, Compliment, and Irrelvant) label for the results against the source data.
ids = [item[0] for item in rrf_rank_list]
df[df["query"] == query_text]
for idx in ids:
print(df.iloc[idx]["esci_label"])
This was amazing! We pulled only Exact results with k=10. This is a great result for a small dataset like this with out of the box vectors which are not even fine-tuned for e-Commerce.
len(rrf_rank_list)
Conclusion
In this notebook, we demonstrated the usage of Hybrid Search with FastEmbed & Qdrant. We used FastEmbed to create Sparse and Dense embeddings for the data and indexed them using Qdrant. We then performed Hybrid Search using FastEmbed & Qdrant and ranked the search results using Reciprocal Rank Fusion (RRF).