Skip to content

Commit 5641b7c

Browse files
feat: adding delete_all_docs to Qdrant document store (#2363)
* cleaning up/improving docstrings * wip * adding sync methods + tests * removing unused import * linting issues and formatting * wip: dealing with type issues in collection recreation * fixing type issues * adding async version + tests * fixing wrong type of assignment error * Update integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py Co-authored-by: Amna Mubashar <[email protected]> * setting to True instead of using the variable value for consistency with sync version --------- Co-authored-by: Amna Mubashar <[email protected]>
1 parent d5cc99f commit 5641b7c

File tree

3 files changed

+196
-9
lines changed

3 files changed

+196
-9
lines changed

integrations/qdrant/src/haystack_integrations/document_stores/qdrant/document_store.py

Lines changed: 118 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,8 @@ def get_batches_from_generator(iterable: List, n: int) -> Generator:
5353

5454
class QdrantDocumentStore:
5555
"""
56-
A QdrantDocumentStore implementation that you
57-
can use with any Qdrant instance: in-memory, disk-persisted, Docker-based,
58-
and Qdrant Cloud Cluster deployments.
56+
A QdrantDocumentStore implementation that you can use with any Qdrant instance: in-memory, disk-persisted,
57+
Docker-based, and Qdrant Cloud Cluster deployments.
5958
6059
Usage example by creating an in-memory instance:
6160
@@ -65,7 +64,8 @@ class QdrantDocumentStore:
6564
6665
document_store = QdrantDocumentStore(
6766
":memory:",
68-
recreate_index=True
67+
recreate_index=True,
68+
embedding_dim=5
6969
)
7070
document_store.write_documents([
7171
Document(content="This is first", embedding=[0.0]*5),
@@ -135,6 +135,8 @@ def __init__(
135135
payload_fields_to_index: Optional[List[dict]] = None,
136136
) -> None:
137137
"""
138+
Initializes a QdrantDocumentStore.
139+
138140
:param location:
139141
If `":memory:"` - use in-memory Qdrant instance.
140142
If `str` - use it as a URL parameter.
@@ -350,7 +352,7 @@ def filter_documents(
350352
# No need to initialize client here as _get_documents_generator
351353
# will handle client initialization internally
352354

353-
self._validate_filters(filters)
355+
QdrantDocumentStore._validate_filters(filters)
354356
return list(
355357
self._get_documents_generator(
356358
filters,
@@ -367,7 +369,7 @@ async def filter_documents_async(
367369
# No need to initialize client here as _get_documents_generator_async
368370
# will handle client initialization internally
369371

370-
self._validate_filters(filters)
372+
QdrantDocumentStore._validate_filters(filters)
371373
return [doc async for doc in self._get_documents_generator_async(filters)]
372374

373375
def write_documents(
@@ -521,6 +523,108 @@ async def delete_documents_async(self, document_ids: List[str]) -> None:
521523
"Called QdrantDocumentStore.delete_documents_async() on a non-existing ID",
522524
)
523525

526+
def delete_all_documents(self, recreate_index: bool = False) -> None:
527+
"""
528+
Deletes all documents from the document store.
529+
530+
:param recreate_index: Whether to recreate the index after deleting all documents.
531+
"""
532+
533+
self._initialize_client()
534+
assert self._client is not None
535+
536+
if recreate_index:
537+
# get current collection config as json
538+
collection_info = self._client.get_collection(collection_name=self.index)
539+
info_json = collection_info.model_dump()
540+
541+
# deal with the Optional use_sparse_embeddings
542+
sparse_vectors = info_json["config"]["params"]["sparse_vectors"]
543+
use_sparse_embeddings = True if sparse_vectors else False
544+
545+
# deal with the Optional sparse_idf
546+
hnsw_config = info_json["config"]["params"]["vectors"].get("config", {}).get("hnsw_config", None)
547+
sparse_idf = True if use_sparse_embeddings and hnsw_config else False
548+
549+
# recreate collection
550+
self._set_up_collection(
551+
collection_name=self.index,
552+
embedding_dim=info_json["config"]["params"]["vectors"]["size"],
553+
recreate_collection=True,
554+
similarity=info_json["config"]["params"]["vectors"]["distance"].lower(),
555+
use_sparse_embeddings=use_sparse_embeddings,
556+
sparse_idf=sparse_idf,
557+
on_disk=info_json["config"]["hnsw_config"]["on_disk"],
558+
payload_fields_to_index=info_json["payload_schema"],
559+
)
560+
561+
else:
562+
try:
563+
self._client.delete(
564+
collection_name=self.index,
565+
points_selector=rest.FilterSelector(
566+
filter=rest.Filter(
567+
must=[],
568+
)
569+
),
570+
wait=self.wait_result_from_api,
571+
)
572+
except Exception as e:
573+
logger.warning(
574+
f"Error {e} when calling QdrantDocumentStore.delete_all_documents()",
575+
)
576+
577+
async def delete_all_documents_async(self, recreate_index: bool = False) -> None:
578+
"""
579+
Asynchronously deletes all documents from the document store.
580+
581+
:param recreate_index: Whether to recreate the index after deleting all documents.
582+
"""
583+
584+
await self._initialize_async_client()
585+
assert self._async_client is not None
586+
587+
if recreate_index:
588+
# get current collection config as json
589+
collection_info = await self._async_client.get_collection(collection_name=self.index)
590+
info_json = collection_info.model_dump()
591+
592+
# deal with the Optional use_sparse_embeddings
593+
sparse_vectors = info_json["config"]["params"]["sparse_vectors"]
594+
use_sparse_embeddings = True if sparse_vectors else False
595+
596+
# deal with the Optional sparse_idf
597+
hnsw_config = info_json["config"]["params"]["vectors"].get("config", {}).get("hnsw_config", None)
598+
sparse_idf = True if use_sparse_embeddings and hnsw_config else False
599+
600+
# recreate collection
601+
await self._set_up_collection_async(
602+
collection_name=self.index,
603+
embedding_dim=info_json["config"]["params"]["vectors"]["size"],
604+
recreate_collection=True,
605+
similarity=info_json["config"]["params"]["vectors"]["distance"].lower(),
606+
use_sparse_embeddings=use_sparse_embeddings,
607+
sparse_idf=sparse_idf,
608+
on_disk=info_json["config"]["hnsw_config"]["on_disk"],
609+
payload_fields_to_index=info_json["payload_schema"],
610+
)
611+
612+
else:
613+
try:
614+
await self._async_client.delete(
615+
collection_name=self.index,
616+
points_selector=rest.FilterSelector(
617+
filter=rest.Filter(
618+
must=[],
619+
)
620+
),
621+
wait=self.wait_result_from_api,
622+
)
623+
except Exception as e:
624+
logger.warning(
625+
f"Error {e} when calling QdrantDocumentStore.delete_all_documents_async()",
626+
)
627+
524628
@classmethod
525629
def from_dict(cls, data: Dict[str, Any]) -> "QdrantDocumentStore":
526630
"""
@@ -1214,7 +1318,8 @@ def get_distance(self, similarity: str) -> rest.Distance:
12141318

12151319
def _create_payload_index(self, collection_name: str, payload_fields_to_index: Optional[List[dict]] = None) -> None:
12161320
"""
1217-
Create payload index for the collection if payload_fields_to_index is provided
1321+
Create payload index for the collection if payload_fields_to_index is provided.
1322+
12181323
See: https://qdrant.tech/documentation/concepts/indexing/#payload-index
12191324
"""
12201325
if payload_fields_to_index is not None:
@@ -1233,7 +1338,8 @@ async def _create_payload_index_async(
12331338
self, collection_name: str, payload_fields_to_index: Optional[List[dict]] = None
12341339
) -> None:
12351340
"""
1236-
Asynchronously create payload index for the collection if payload_fields_to_index is provided
1341+
Asynchronously create payload index for the collection if payload_fields_to_index is provided.
1342+
12371343
See: https://qdrant.tech/documentation/concepts/indexing/#payload-index
12381344
"""
12391345
if payload_fields_to_index is not None:
@@ -1261,6 +1367,7 @@ def _set_up_collection(
12611367
) -> None:
12621368
"""
12631369
Sets up the Qdrant collection with the specified parameters.
1370+
12641371
:param collection_name:
12651372
The name of the collection to set up.
12661373
:param embedding_dim:
@@ -1317,6 +1424,7 @@ async def _set_up_collection_async(
13171424
) -> None:
13181425
"""
13191426
Asynchronously sets up the Qdrant collection with the specified parameters.
1427+
13201428
:param collection_name:
13211429
The name of the collection to set up.
13221430
:param embedding_dim:
@@ -1601,7 +1709,8 @@ def _prepare_collection_config(
16011709

16021710
return vectors_config, sparse_vectors_config
16031711

1604-
def _validate_filters(self, filters: Optional[Union[Dict[str, Any], rest.Filter]] = None) -> None:
1712+
@staticmethod
1713+
def _validate_filters(filters: Optional[Union[Dict[str, Any], rest.Filter]] = None) -> None:
16051714
"""
16061715
Validates the filters provided for querying.
16071716

integrations/qdrant/tests/test_document_store.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,3 +297,41 @@ def test_set_up_collection_with_dimension_mismatch(self):
297297
):
298298
with pytest.raises(ValueError, match="different vector size"):
299299
document_store._set_up_collection("test_collection", 768, False, "cosine", False, False)
300+
301+
def test_delete_all_documents_no_index_recreation(self, document_store):
302+
document_store._initialize_client()
303+
304+
# write some documents
305+
docs = [Document(id=str(i)) for i in range(5)]
306+
document_store.write_documents(docs)
307+
308+
# delete all documents without recreating the index
309+
document_store.delete_all_documents(recreate_index=False)
310+
assert document_store.count_documents() == 0
311+
312+
# ensure the collection still exists by writing documents again
313+
document_store.write_documents(docs)
314+
assert document_store.count_documents() == 5
315+
316+
def test_delete_all_documents_index_recreation(self, document_store):
317+
document_store._initialize_client()
318+
319+
# write some documents
320+
docs = [Document(id=str(i)) for i in range(5)]
321+
document_store.write_documents(docs)
322+
323+
# get the current document_store config
324+
config_before = document_store._client.get_collection(document_store.index)
325+
326+
# delete all documents with recreating the index
327+
document_store.delete_all_documents(recreate_index=True)
328+
assert document_store.count_documents() == 0
329+
330+
# assure that with the same config
331+
config_after = document_store._client.get_collection(document_store.index)
332+
333+
assert config_before.config == config_after.config
334+
335+
# ensure the collection still exists by writing documents again
336+
document_store.write_documents(docs)
337+
assert document_store.count_documents() == 5

integrations/qdrant/tests/test_document_store_async.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,3 +218,43 @@ async def test_set_up_collection_with_distance_mismatch_async(self):
218218
):
219219
with pytest.raises(ValueError, match="different similarity"):
220220
await document_store._set_up_collection_async("test_collection", 768, False, "cosine", False, False)
221+
222+
@pytest.mark.asyncio
223+
async def test_delete_all_documents_async_no_index_recreation(self, document_store):
224+
await document_store._initialize_async_client()
225+
226+
# write some documents
227+
docs = [Document(id=str(i)) for i in range(5)]
228+
await document_store.write_documents_async(docs)
229+
230+
# delete all documents without recreating the index
231+
await document_store.delete_all_documents_async(recreate_index=False)
232+
assert await document_store.count_documents_async() == 0
233+
234+
# ensure the collection still exists by writing documents again
235+
await document_store.write_documents_async(docs)
236+
assert await document_store.count_documents_async() == 5
237+
238+
@pytest.mark.asyncio
239+
async def test_delete_all_documents_async_index_recreation(self, document_store):
240+
await document_store._initialize_async_client()
241+
242+
# write some documents
243+
docs = [Document(id=str(i)) for i in range(5)]
244+
await document_store.write_documents_async(docs)
245+
246+
# get the current document_store config
247+
config_before = await document_store._async_client.get_collection(document_store.index)
248+
249+
# delete all documents with recreating the index
250+
await document_store.delete_all_documents_async(recreate_index=True)
251+
assert await document_store.count_documents_async() == 0
252+
253+
# assure that with the same config
254+
config_after = await document_store._async_client.get_collection(document_store.index)
255+
256+
assert config_before.config == config_after.config
257+
258+
# ensure the collection still exists by writing documents again
259+
await document_store.write_documents_async(docs)
260+
assert await document_store.count_documents_async() == 5

0 commit comments

Comments
 (0)