Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,114 @@ async def delete_documents_async(self, document_ids: List[str]) -> None:
return
await self._collection_async.delete_many(filter={"id": {"$in": document_ids}})

def delete_by_filter(self, filters: Dict[str, Any]) -> int:
"""
Deletes all documents that match the provided filters.

:param filters: The filters to apply to select documents for deletion.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:returns: The number of documents deleted.
"""
self._ensure_connection_setup()
assert self._collection is not None

try:
normalized_filters = _normalize_filters(filters)
result = self._collection.delete_many(filter=normalized_filters)
deleted_count = result.deleted_count
logger.info(
"Deleted {n_docs} documents from collection '{collection}' using filters.",
n_docs=deleted_count,
collection=self.collection_name,
)
return deleted_count
except Exception as e:
msg = f"Failed to delete documents by filter from MongoDB Atlas: {e!s}"
raise DocumentStoreError(msg) from e

async def delete_by_filter_async(self, filters: Dict[str, Any]) -> int:
"""
Asynchronously deletes all documents that match the provided filters.

:param filters: The filters to apply to select documents for deletion.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:returns: The number of documents deleted.
"""
await self._ensure_connection_setup_async()
assert self._collection_async is not None

try:
normalized_filters = _normalize_filters(filters)
result = await self._collection_async.delete_many(filter=normalized_filters)
deleted_count = result.deleted_count
logger.info(
"Deleted {n_docs} documents from collection '{collection}' using filters.",
n_docs=deleted_count,
collection=self.collection_name,
)
return deleted_count
except Exception as e:
msg = f"Failed to delete documents by filter from MongoDB Atlas: {e!s}"
raise DocumentStoreError(msg) from e

def update_by_filter(self, filters: Dict[str, Any], meta: Dict[str, Any]) -> int:
"""
Updates the metadata of all documents that match the provided filters.

:param filters: The filters to apply to select documents for updating.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param meta: The metadata fields to update.
:returns: The number of documents updated.
"""
self._ensure_connection_setup()
assert self._collection is not None

try:
normalized_filters = _normalize_filters(filters)
# Build update operation to set metadata fields
# MongoDB stores documents with flatten=False, so metadata is in the "meta" field
update_fields = {f"meta.{key}": value for key, value in meta.items()}
result = self._collection.update_many(filter=normalized_filters, update={"$set": update_fields})
updated_count = result.modified_count
logger.info(
"Updated {n_docs} documents in collection '{collection}' using filters.",
n_docs=updated_count,
collection=self.collection_name,
)
return updated_count
except Exception as e:
msg = f"Failed to update documents by filter in MongoDB Atlas: {e!s}"
raise DocumentStoreError(msg) from e

async def update_by_filter_async(self, filters: Dict[str, Any], meta: Dict[str, Any]) -> int:
"""
Asynchronously updates the metadata of all documents that match the provided filters.

:param filters: The filters to apply to select documents for updating.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param meta: The metadata fields to update.
:returns: The number of documents updated.
"""
await self._ensure_connection_setup_async()
assert self._collection_async is not None

try:
normalized_filters = _normalize_filters(filters)
# Build update operation to set metadata fields
# MongoDB stores documents with flatten=False, so metadata is in the "meta" field
update_fields = {f"meta.{key}": value for key, value in meta.items()}
result = await self._collection_async.update_many(filter=normalized_filters, update={"$set": update_fields})
updated_count = result.modified_count
logger.info(
"Updated {n_docs} documents in collection '{collection}' using filters.",
n_docs=updated_count,
collection=self.collection_name,
)
return updated_count
except Exception as e:
msg = f"Failed to update documents by filter in MongoDB Atlas: {e!s}"
raise DocumentStoreError(msg) from e

def delete_all_documents(self, *, recreate_collection: bool = False) -> None:
"""
Deletes all documents in the document store.
Expand Down
52 changes: 52 additions & 0 deletions integrations/mongodb_atlas/tests/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,58 @@ def test_custom_content_field(self):
finally:
database[collection_name].drop()

def test_delete_by_filter(self, document_store: MongoDBAtlasDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A"}),
Document(content="Doc 2", meta={"category": "B"}),
Document(content="Doc 3", meta={"category": "A"}),
]
document_store.write_documents(docs)
assert document_store.count_documents() == 3

# Delete documents with category="A"
deleted_count = document_store.delete_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "A"}
)
assert deleted_count == 2
assert document_store.count_documents() == 1

# Verify the remaining document is the one with category="B"
remaining_docs = document_store.filter_documents()
assert len(remaining_docs) == 1
assert remaining_docs[0].meta["category"] == "B"

def test_update_by_filter(self, document_store: MongoDBAtlasDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A"}),
Document(content="Doc 2", meta={"category": "B"}),
Document(content="Doc 3", meta={"category": "A"}),
]
document_store.write_documents(docs)
assert document_store.count_documents() == 3

# Update documents with category="A" to have status="published"
updated_count = document_store.update_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"}
)
assert updated_count == 2

# Verify the updated documents have the new metadata
published_docs = document_store.filter_documents(
filters={"field": "meta.status", "operator": "==", "value": "published"}
)
assert len(published_docs) == 2
for doc in published_docs:
assert doc.meta["status"] == "published"
assert doc.meta["category"] == "A"

# Verify documents with category="B" were not updated
unpublished_docs = document_store.filter_documents(
filters={"field": "meta.category", "operator": "==", "value": "B"}
)
assert len(unpublished_docs) == 1
assert "status" not in unpublished_docs[0].meta

def test_delete_all_documents(self, document_store: MongoDBAtlasDocumentStore):
docs = [Document(id="1", content="first doc"), Document(id="2", content="second doc")]
document_store.write_documents(docs)
Expand Down
52 changes: 52 additions & 0 deletions integrations/mongodb_atlas/tests/test_document_store_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,58 @@ async def test_delete_documents_async(self, document_store: MongoDBAtlasDocument
await document_store.delete_documents_async(document_ids=["1"])
assert await document_store.count_documents_async() == 0

async def test_delete_by_filter_async(self, document_store: MongoDBAtlasDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A"}),
Document(content="Doc 2", meta={"category": "B"}),
Document(content="Doc 3", meta={"category": "A"}),
]
await document_store.write_documents_async(docs)
assert await document_store.count_documents_async() == 3

# Delete documents with category="A"
deleted_count = await document_store.delete_by_filter_async(
filters={"field": "meta.category", "operator": "==", "value": "A"}
)
assert deleted_count == 2
assert await document_store.count_documents_async() == 1

# Verify the remaining document is the one with category="B"
remaining_docs = await document_store.filter_documents_async()
assert len(remaining_docs) == 1
assert remaining_docs[0].meta["category"] == "B"

async def test_update_by_filter_async(self, document_store: MongoDBAtlasDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A"}),
Document(content="Doc 2", meta={"category": "B"}),
Document(content="Doc 3", meta={"category": "A"}),
]
await document_store.write_documents_async(docs)
assert await document_store.count_documents_async() == 3

# Update documents with category="A" to have status="published"
updated_count = await document_store.update_by_filter_async(
filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"}
)
assert updated_count == 2

# Verify the updated documents have the new metadata
published_docs = await document_store.filter_documents_async(
filters={"field": "meta.status", "operator": "==", "value": "published"}
)
assert len(published_docs) == 2
for doc in published_docs:
assert doc.meta["status"] == "published"
assert doc.meta["category"] == "A"

# Verify documents with category="B" were not updated
unpublished_docs = await document_store.filter_documents_async(
filters={"field": "meta.category", "operator": "==", "value": "B"}
)
assert len(unpublished_docs) == 1
assert "status" not in unpublished_docs[0].meta

async def test_delete_all_documents_async(self, document_store: MongoDBAtlasDocumentStore):
docs = [Document(id="1", content="first doc"), Document(id="2", content="second doc")]
await document_store.write_documents_async(docs)
Expand Down