diff --git a/integrations/mongodb_atlas/src/haystack_integrations/document_stores/mongodb_atlas/document_store.py b/integrations/mongodb_atlas/src/haystack_integrations/document_stores/mongodb_atlas/document_store.py index 4bb41f98bf..0655b96605 100644 --- a/integrations/mongodb_atlas/src/haystack_integrations/document_stores/mongodb_atlas/document_store.py +++ b/integrations/mongodb_atlas/src/haystack_integrations/document_stores/mongodb_atlas/document_store.py @@ -423,6 +423,114 @@ async def delete_documents_async(self, document_ids: List[str]) -> None: return await self._collection_async.delete_many(filter={"id": {"$in": document_ids}}) + def delete_by_filter(self, filters: Dict[str, Any]) -> int: + """ + Deletes all documents that match the provided filters. + + :param filters: The filters to apply to select documents for deletion. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :returns: The number of documents deleted. + """ + self._ensure_connection_setup() + assert self._collection is not None + + try: + normalized_filters = _normalize_filters(filters) + result = self._collection.delete_many(filter=normalized_filters) + deleted_count = result.deleted_count + logger.info( + "Deleted {n_docs} documents from collection '{collection}' using filters.", + n_docs=deleted_count, + collection=self.collection_name, + ) + return deleted_count + except Exception as e: + msg = f"Failed to delete documents by filter from MongoDB Atlas: {e!s}" + raise DocumentStoreError(msg) from e + + async def delete_by_filter_async(self, filters: Dict[str, Any]) -> int: + """ + Asynchronously deletes all documents that match the provided filters. + + :param filters: The filters to apply to select documents for deletion. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :returns: The number of documents deleted. + """ + await self._ensure_connection_setup_async() + assert self._collection_async is not None + + try: + normalized_filters = _normalize_filters(filters) + result = await self._collection_async.delete_many(filter=normalized_filters) + deleted_count = result.deleted_count + logger.info( + "Deleted {n_docs} documents from collection '{collection}' using filters.", + n_docs=deleted_count, + collection=self.collection_name, + ) + return deleted_count + except Exception as e: + msg = f"Failed to delete documents by filter from MongoDB Atlas: {e!s}" + raise DocumentStoreError(msg) from e + + def update_by_filter(self, filters: Dict[str, Any], meta: Dict[str, Any]) -> int: + """ + Updates the metadata of all documents that match the provided filters. + + :param filters: The filters to apply to select documents for updating. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param meta: The metadata fields to update. + :returns: The number of documents updated. + """ + self._ensure_connection_setup() + assert self._collection is not None + + try: + normalized_filters = _normalize_filters(filters) + # Build update operation to set metadata fields + # MongoDB stores documents with flatten=False, so metadata is in the "meta" field + update_fields = {f"meta.{key}": value for key, value in meta.items()} + result = self._collection.update_many(filter=normalized_filters, update={"$set": update_fields}) + updated_count = result.modified_count + logger.info( + "Updated {n_docs} documents in collection '{collection}' using filters.", + n_docs=updated_count, + collection=self.collection_name, + ) + return updated_count + except Exception as e: + msg = f"Failed to update documents by filter in MongoDB Atlas: {e!s}" + raise DocumentStoreError(msg) from e + + async def update_by_filter_async(self, filters: Dict[str, Any], meta: Dict[str, Any]) -> int: + """ + Asynchronously updates the metadata of all documents that match the provided filters. + + :param filters: The filters to apply to select documents for updating. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param meta: The metadata fields to update. + :returns: The number of documents updated. + """ + await self._ensure_connection_setup_async() + assert self._collection_async is not None + + try: + normalized_filters = _normalize_filters(filters) + # Build update operation to set metadata fields + # MongoDB stores documents with flatten=False, so metadata is in the "meta" field + update_fields = {f"meta.{key}": value for key, value in meta.items()} + result = await self._collection_async.update_many(filter=normalized_filters, update={"$set": update_fields}) + updated_count = result.modified_count + logger.info( + "Updated {n_docs} documents in collection '{collection}' using filters.", + n_docs=updated_count, + collection=self.collection_name, + ) + return updated_count + except Exception as e: + msg = f"Failed to update documents by filter in MongoDB Atlas: {e!s}" + raise DocumentStoreError(msg) from e + def delete_all_documents(self, *, recreate_collection: bool = False) -> None: """ Deletes all documents in the document store. diff --git a/integrations/mongodb_atlas/tests/test_document_store.py b/integrations/mongodb_atlas/tests/test_document_store.py index b1b351dc22..1eefda46f3 100644 --- a/integrations/mongodb_atlas/tests/test_document_store.py +++ b/integrations/mongodb_atlas/tests/test_document_store.py @@ -334,6 +334,58 @@ def test_custom_content_field(self): finally: database[collection_name].drop() + def test_delete_by_filter(self, document_store: MongoDBAtlasDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + Document(content="Doc 3", meta={"category": "A"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 3 + + # Delete documents with category="A" + deleted_count = document_store.delete_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "A"} + ) + assert deleted_count == 2 + assert document_store.count_documents() == 1 + + # Verify the remaining document is the one with category="B" + remaining_docs = document_store.filter_documents() + assert len(remaining_docs) == 1 + assert remaining_docs[0].meta["category"] == "B" + + def test_update_by_filter(self, document_store: MongoDBAtlasDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + Document(content="Doc 3", meta={"category": "A"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 3 + + # Update documents with category="A" to have status="published" + updated_count = document_store.update_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"} + ) + assert updated_count == 2 + + # Verify the updated documents have the new metadata + published_docs = document_store.filter_documents( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 2 + for doc in published_docs: + assert doc.meta["status"] == "published" + assert doc.meta["category"] == "A" + + # Verify documents with category="B" were not updated + unpublished_docs = document_store.filter_documents( + filters={"field": "meta.category", "operator": "==", "value": "B"} + ) + assert len(unpublished_docs) == 1 + assert "status" not in unpublished_docs[0].meta + def test_delete_all_documents(self, document_store: MongoDBAtlasDocumentStore): docs = [Document(id="1", content="first doc"), Document(id="2", content="second doc")] document_store.write_documents(docs) diff --git a/integrations/mongodb_atlas/tests/test_document_store_async.py b/integrations/mongodb_atlas/tests/test_document_store_async.py index 2660136ca4..4597984360 100644 --- a/integrations/mongodb_atlas/tests/test_document_store_async.py +++ b/integrations/mongodb_atlas/tests/test_document_store_async.py @@ -128,6 +128,58 @@ async def test_delete_documents_async(self, document_store: MongoDBAtlasDocument await document_store.delete_documents_async(document_ids=["1"]) assert await document_store.count_documents_async() == 0 + async def test_delete_by_filter_async(self, document_store: MongoDBAtlasDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + Document(content="Doc 3", meta={"category": "A"}), + ] + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 3 + + # Delete documents with category="A" + deleted_count = await document_store.delete_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "A"} + ) + assert deleted_count == 2 + assert await document_store.count_documents_async() == 1 + + # Verify the remaining document is the one with category="B" + remaining_docs = await document_store.filter_documents_async() + assert len(remaining_docs) == 1 + assert remaining_docs[0].meta["category"] == "B" + + async def test_update_by_filter_async(self, document_store: MongoDBAtlasDocumentStore): + docs = [ + Document(content="Doc 1", meta={"category": "A"}), + Document(content="Doc 2", meta={"category": "B"}), + Document(content="Doc 3", meta={"category": "A"}), + ] + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 3 + + # Update documents with category="A" to have status="published" + updated_count = await document_store.update_by_filter_async( + filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"} + ) + assert updated_count == 2 + + # Verify the updated documents have the new metadata + published_docs = await document_store.filter_documents_async( + filters={"field": "meta.status", "operator": "==", "value": "published"} + ) + assert len(published_docs) == 2 + for doc in published_docs: + assert doc.meta["status"] == "published" + assert doc.meta["category"] == "A" + + # Verify documents with category="B" were not updated + unpublished_docs = await document_store.filter_documents_async( + filters={"field": "meta.category", "operator": "==", "value": "B"} + ) + assert len(unpublished_docs) == 1 + assert "status" not in unpublished_docs[0].meta + async def test_delete_all_documents_async(self, document_store: MongoDBAtlasDocumentStore): docs = [Document(id="1", content="first doc"), Document(id="2", content="second doc")] await document_store.write_documents_async(docs)