-
Notifications
You must be signed in to change notification settings - Fork 45
added ingest client abstraction #442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/AddLocalFileSourceV2
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -7,12 +7,19 @@ import com.microsoft.azure.kusto.ingest.v2.common.exceptions.IngestException | |||||
| import com.microsoft.azure.kusto.ingest.v2.infrastructure.HttpResponse | ||||||
| import com.microsoft.azure.kusto.ingest.v2.models.Format | ||||||
| import com.microsoft.azure.kusto.ingest.v2.models.IngestRequestProperties | ||||||
| import com.microsoft.azure.kusto.ingest.v2.models.IngestResponse | ||||||
| import com.microsoft.azure.kusto.ingest.v2.models.StatusResponse | ||||||
| import com.microsoft.azure.kusto.ingest.v2.source.BlobSourceInfo | ||||||
| import com.microsoft.azure.kusto.ingest.v2.source.FileSourceInfo | ||||||
| import com.microsoft.azure.kusto.ingest.v2.source.SourceInfo | ||||||
| import com.microsoft.azure.kusto.ingest.v2.source.StreamSourceInfo | ||||||
| import io.ktor.http.HttpStatusCode | ||||||
| import kotlinx.serialization.SerialName | ||||||
| import kotlinx.serialization.Serializable | ||||||
| import kotlinx.serialization.json.Json | ||||||
| import java.net.ConnectException | ||||||
| import java.net.URI | ||||||
| import java.util.UUID | ||||||
|
|
||||||
| @Serializable | ||||||
| private data class StreamFromBlobRequestBody( | ||||||
|
|
@@ -27,6 +34,108 @@ class StreamingIngestClient( | |||||
| KustoBaseApiClient(engineUrl, tokenCredential, skipSecurityChecks), | ||||||
| IngestClient { | ||||||
|
|
||||||
| /** | ||||||
| * Handles multiple source types for streaming ingestion. | ||||||
| */ | ||||||
| override suspend fun submitIngestion( | ||||||
| database: String, | ||||||
| table: String, | ||||||
| sources: List<SourceInfo>, | ||||||
| format: Format, | ||||||
| ingestProperties: IngestRequestProperties?, | ||||||
| ): IngestResponse { | ||||||
| require(sources.isNotEmpty()) { "At least one source is required" } | ||||||
|
|
||||||
| // Streaming ingestion processes one source at a time | ||||||
| val source = sources.first() | ||||||
| val operationId = UUID.randomUUID().toString() | ||||||
|
|
||||||
| when (source) { | ||||||
| is BlobSourceInfo -> { | ||||||
| logger.info("Streaming ingestion from BlobSource: ${source.blobPath}") | ||||||
| submitStreamingIngestion( | ||||||
| database = database, | ||||||
| table = table, | ||||||
| data = ByteArray(0), // Not used for blob-based streaming | ||||||
| format = format, | ||||||
| ingestProperties = ingestProperties, | ||||||
| blobUrl = source.blobPath | ||||||
| ) | ||||||
| } | ||||||
| is FileSourceInfo -> { | ||||||
| logger.info("Streaming ingestion from FileSource: ${source.name}") | ||||||
| val data = source.data().readBytes() | ||||||
| submitStreamingIngestion( | ||||||
| database = database, | ||||||
| table = table, | ||||||
| data = data, | ||||||
| format = format, | ||||||
| ingestProperties = ingestProperties, | ||||||
| blobUrl = null | ||||||
| ) | ||||||
| source.close() | ||||||
| } | ||||||
| is StreamSourceInfo -> { | ||||||
| logger.info("Streaming ingestion from StreamSource: ${source.name}") | ||||||
| val data = source.data().readBytes() | ||||||
| submitStreamingIngestion( | ||||||
| database = database, | ||||||
| table = table, | ||||||
| data = data, | ||||||
| format = format, | ||||||
| ingestProperties = ingestProperties, | ||||||
| blobUrl = null | ||||||
| ) | ||||||
| source.close() | ||||||
| } | ||||||
| else -> { | ||||||
| throw IngestException( | ||||||
| message = "Unsupported source type for streaming ingestion: ${source::class.simpleName}", | ||||||
| isPermanent = true | ||||||
| ) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| // Streaming ingestion doesn't return an operation ID from the server | ||||||
| // We generate one locally for consistency with the IngestClient interface | ||||||
| return IngestResponse(ingestionOperationId = operationId) | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Note: Streaming ingestion doesn't support operation tracking. | ||||||
| * Throws UnsupportedOperationException. | ||||||
| */ | ||||||
| override suspend fun getIngestionStatus( | ||||||
| database: String, | ||||||
| table: String, | ||||||
| operationId: String, | ||||||
| forceDetails: Boolean, | ||||||
| ): StatusResponse { | ||||||
| throw UnsupportedOperationException( | ||||||
| "Streaming ingestion does not support operation status tracking. " + | ||||||
| "Operation ID: $operationId cannot be tracked. " | ||||||
|
||||||
| "Operation ID: $operationId cannot be tracked. " | |
| "Operation ID: $operationId cannot be tracked." |
Outdated
Copilot
AI
Nov 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trailing space at the end of the error message string. Remove the trailing space for cleaner error messages.
| "Operation ID: $operationId cannot be tracked. " | |
| "Operation ID: $operationId cannot be tracked." |
Outdated
Copilot
AI
Nov 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary blank lines (lines 136-138). Remove extra whitespace between method definitions for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Grammatical error: 'interface with provides' should be 'Interface that provides' or 'Interface which provides'.