22import json
33from datetime import datetime
44
5+ from html_to_markdown import convert
6+
57from app .config .constants .ai_models import (
68 AzureDocIntelligenceModel ,
79 OCRProvider ,
1416 OriginTypes ,
1517)
1618from app .config .constants .service import config_node_constants
19+ from app .exceptions .indexing_exceptions import DocumentProcessingError
1720from app .models .entities import Record , RecordStatus , RecordType
1821from app .modules .parsers .pdf .docling import DoclingProcessor
1922from app .modules .parsers .pdf .ocr_handler import OCRHandler
2023from app .modules .transformers .pipeline import IndexingPipeline
2124from app .modules .transformers .transformer import TransformContext
2225from app .services .docling .client import DoclingClient
2326from app .utils .llm import get_llm
27+ from app .utils .time_conversion import get_epoch_timestamp_in_ms
2428
2529
2630def convert_record_dict_to_record (record_dict : dict ) -> Record :
@@ -565,24 +569,21 @@ async def process_gmail_message(
565569 self , recordName , recordId , version , source , orgId , html_content , virtual_record_id
566570 ) -> None :
567571
568-
569572 self .logger .info ("🚀 Processing Gmail Message" )
570573
571574 try :
572- # Convert binary to string
573- html_content = (
574- html_content .decode ("utf-8" )
575- if isinstance (html_content , bytes )
576- else html_content
575+
576+ await self .process_html_document (
577+ recordName = recordName ,
578+ recordId = recordId ,
579+ version = version ,
580+ source = source ,
581+ orgId = orgId ,
582+ html_content = html_content ,
583+ virtual_record_id = virtual_record_id
577584 )
578- self .logger .debug (f"📄 Decoded HTML content length: { len (html_content )} " )
579585
580- # Initialize HTML parser and parse content
581- self .logger .debug ("📄 Processing HTML content" )
582- parser = self .parsers ["html" ]
583- html_bytes = parser .parse_string (html_content )
584- await self .process_html_bytes (recordName , recordId , html_bytes , virtual_record_id )
585- self .logger .info ("✅ Gmail Message processing completed successfully." )
586+ self .logger .info ("✅ Gmail Message processing completed successfully using markdown conversion." )
586587
587588 except Exception as e :
588589 self .logger .error (f"❌ Error processing Gmail Message document: { str (e )} " )
@@ -1079,10 +1080,40 @@ def process_item(ref, level=0, parent_context=None) -> None:
10791080 self .logger .debug (f"Processed { len (ordered_items )} items in order" )
10801081 return ordered_items
10811082
1083+ async def _mark_record_as_completed (self , record_id , virtual_record_id ) -> None :
1084+ record = await self .arango_service .get_document (
1085+ record_id , CollectionNames .RECORDS .value
1086+ )
1087+ if not record :
1088+ raise DocumentProcessingError (
1089+ "Record not found in database" ,
1090+ doc_id = record_id ,
1091+ )
1092+ doc = dict (record )
1093+ doc .update (
1094+ {
1095+ "indexingStatus" : "COMPLETED" ,
1096+ "isDirty" : False ,
1097+ "lastIndexTimestamp" : get_epoch_timestamp_in_ms (),
1098+ "virtualRecordId" : virtual_record_id ,
1099+ }
1100+ )
1101+
1102+ docs = [doc ]
1103+
1104+ success = await self .arango_service .batch_upsert_nodes (
1105+ docs , CollectionNames .RECORDS .value
1106+ )
1107+ if not success :
1108+ raise DocumentProcessingError (
1109+ "Failed to update indexing status" , doc_id = record_id
1110+ )
1111+ return
1112+
10821113 async def process_html_document (
1083- self , recordName , recordId , version , source , orgId , html_content , virtual_record_id , origin , recordType
1114+ self , recordName , recordId , version , source , orgId , html_content , virtual_record_id
10841115 ) -> None :
1085- """Process HTML document and extract structured content """
1116+ """Process HTML document by converting to markdown and using markdown processing """
10861117 self .logger .info (
10871118 f"🚀 Starting HTML document processing for record: { recordName } "
10881119 )
@@ -1096,12 +1127,39 @@ async def process_html_document(
10961127 )
10971128 self .logger .debug (f"📄 Decoded HTML content length: { len (html_content )} " )
10981129
1099- # Initialize HTML parser and parse content
1100- self .logger .debug ("📄 Processing HTML content" )
1101- parser = self .parsers [ExtensionTypes .HTML .value ]
1102- html_bytes = parser .parse_string (html_content )
1103- await self .process_html_bytes (recordName , recordId , html_bytes , virtual_record_id )
1104- self .logger .info ("✅ HTML processing completed successfully." )
1130+ # Convert HTML to markdown
1131+ self .logger .debug ("📄 Converting HTML to markdown" )
1132+ markdown = convert (html_content )
1133+ markdown = markdown .strip ()
1134+
1135+ if markdown is None or markdown == "" :
1136+ try :
1137+ await self ._mark_record_as_completed (recordId , virtual_record_id )
1138+ self .logger .info ("✅ HTML processing completed successfully using markdown conversion." )
1139+ return
1140+ except DocumentProcessingError :
1141+ raise
1142+ except Exception as e :
1143+ raise DocumentProcessingError (
1144+ "Error updating record status: " + str (e ),
1145+ doc_id = recordId ,
1146+ details = {"error" : str (e )},
1147+ )
1148+ # Convert markdown content to bytes for processing
1149+ md_binary = markdown .encode ("utf-8" )
1150+
1151+ # Use the existing markdown processing function
1152+ await self .process_md_document (
1153+ recordName = recordName ,
1154+ recordId = recordId ,
1155+ version = version ,
1156+ source = source ,
1157+ orgId = orgId ,
1158+ md_binary = md_binary ,
1159+ virtual_record_id = virtual_record_id
1160+ )
1161+
1162+ self .logger .info ("✅ HTML processing completed successfully using markdown conversion." )
11051163
11061164 except Exception as e :
11071165 self .logger .error (f"❌ Error processing HTML document: { str (e )} " )
@@ -1586,28 +1644,3 @@ async def process_ppt_document(
15861644
15871645 return {"status" : "success" , "message" : "PPT processed successfully" }
15881646
1589- async def process_html_bytes (self , recordName , recordId , html_bytes , virtual_record_id ) -> None :
1590- """Process HTML bytes and extract structured content"""
1591- self .logger .info (f"🚀 Starting HTML document processing for record: { recordName } " )
1592- try :
1593- processor = DoclingProcessor (logger = self .logger ,config = self .config_service )
1594- record_name = recordName if recordName .endswith (".html" ) else f"{ recordName } .html"
1595- block_containers = await processor .load_document (record_name , html_bytes )
1596- if block_containers is False :
1597- raise Exception ("Failed to process HTML document. It might contain scanned pages." )
1598-
1599- record = await self .arango_service .get_document (
1600- recordId , CollectionNames .RECORDS .value
1601- )
1602- if record is None :
1603- self .logger .error (f"❌ Record { recordId } not found in database" )
1604- raise Exception (f"Record { recordId } not found in graph db" )
1605- record = convert_record_dict_to_record (record )
1606- record .block_containers = block_containers
1607- record .virtual_record_id = virtual_record_id
1608- ctx = TransformContext (record = record )
1609- pipeline = IndexingPipeline (document_extraction = self .document_extraction , sink_orchestrator = self .sink_orchestrator )
1610- await pipeline .apply (ctx )
1611- except Exception as e :
1612- self .logger .error (f"❌ Error processing HTML bytes: { str (e )} " )
1613- raise
0 commit comments