Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,82 @@ package com.microsoft.azure.kusto.ingest.v2

import com.microsoft.azure.kusto.ingest.v2.common.exceptions.IngestException
import com.microsoft.azure.kusto.ingest.v2.infrastructure.HttpResponse
import com.microsoft.azure.kusto.ingest.v2.models.Format
import com.microsoft.azure.kusto.ingest.v2.models.IngestRequestProperties
import com.microsoft.azure.kusto.ingest.v2.models.IngestResponse
import com.microsoft.azure.kusto.ingest.v2.models.StatusResponse
import com.microsoft.azure.kusto.ingest.v2.source.SourceInfo
import io.ktor.http.HttpStatusCode
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.net.ConnectException

/**
* interface with provides core abstraction for ingesting data into Kusto.
Copy link

Copilot AI Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammatical error: 'interface with provides' should be 'Interface that provides' or 'Interface which provides'.

Suggested change
* interface with provides core abstraction for ingesting data into Kusto.
* Interface that provides core abstraction for ingesting data into Kusto.

Copilot uses AI. Check for mistakes.
*
* Supports multiple source types:
* - BlobSourceInfo: Ingest from Azure Blob Storage
* - FileSourceInfo: Ingest from local files
* - StreamSourceInfo: Ingest from in-memory streams
*/
interface IngestClient {

val logger: Logger
get() = LoggerFactory.getLogger(IngestClient::class.java)

/**
* Submits an ingestion request from any source type.
*
* @param database The target database name
* @param table The target table name
* @param sources List of sources to ingest (BlobSourceInfo, FileSourceInfo,
* or StreamSourceInfo)
* @param format The data format (CSV, JSON, others)
* @param ingestProperties Optional ingestion properties
* @return IngestResponse containing the operation ID for tracking
*/
suspend fun submitIngestion(
database: String,
table: String,
sources: List<SourceInfo>,
format: Format = Format.csv,
ingestProperties: IngestRequestProperties? = null,
): IngestResponse

/**
* Gets the status of an ingestion operation.
*
* @param database The target database name
* @param table The target table name
* @param operationId The operation ID returned from submitIngestion
* @param forceDetails Whether to force retrieval of detailed information
* @return StatusResponse containing the current status
*/
suspend fun getIngestionStatus(
database: String,
table: String,
operationId: String,
forceDetails: Boolean = false,
): StatusResponse

/**
* Gets detailed information about an ingestion operation.
*
* @param database The target database name
* @param table The target table name
* @param operationId The operation ID returned from submitIngestion
* @param details Whether to retrieve detailed blob-level information
* @return StatusResponse containing operation details
* @throws UnsupportedOperationException if the implementation doesn't
* support operation tracking
*/
suspend fun getIngestionDetails(
database: String,
table: String,
operationId: String,
details: Boolean = true,
): StatusResponse

// Common way to parse ingestion response for both Streaming and Queued ingestion

suspend fun <T : Any> handleIngestResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.microsoft.azure.kusto.ingest.v2
import com.azure.core.credential.TokenCredential
import com.azure.core.credential.TokenRequestContext
import com.microsoft.azure.kusto.ingest.v2.apis.DefaultApi
import com.microsoft.azure.kusto.ingest.v2.common.ClientDetails
import com.microsoft.azure.kusto.ingest.v2.common.serialization.OffsetDateTimeSerializer
import io.ktor.client.HttpClientConfig
import io.ktor.client.plugins.DefaultRequest
Expand All @@ -27,6 +28,7 @@ open class KustoBaseApiClient(
open val dmUrl: String,
open val tokenCredential: TokenCredential,
open val skipSecurityChecks: Boolean = false,
open val clientDetails: ClientDetails? = null,
) {
private val logger = LoggerFactory.getLogger(KustoBaseApiClient::class.java)
protected val setupConfig: (HttpClientConfig<*>) -> Unit = { config ->
Expand All @@ -40,6 +42,13 @@ open class KustoBaseApiClient(
private fun getClientConfig(config: HttpClientConfig<*>) {
config.install(DefaultRequest) {
header("Content-Type", "application/json")

// Add client details headers if provided
clientDetails?.let { details ->
header("x-ms-app", details.getApplicationForTracing())
header("x-ms-user", details.getUserNameForTracing())
header("x-ms-client-version", details.getClientVersionForTracing())
}
}
val trc = TokenRequestContext().addScopes("$dmUrl/.default")
config.install(Auth) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.microsoft.azure.kusto.ingest.v2

import com.azure.core.credential.TokenCredential
import com.microsoft.azure.kusto.ingest.v2.common.ClientDetails
import com.microsoft.azure.kusto.ingest.v2.common.DefaultConfigurationCache
import com.microsoft.azure.kusto.ingest.v2.common.exceptions.IngestException
import com.microsoft.azure.kusto.ingest.v2.common.utils.IngestionResultUtils
Expand Down Expand Up @@ -31,10 +32,42 @@ class QueuedIngestionClient(
override val dmUrl: String,
override val tokenCredential: TokenCredential,
override val skipSecurityChecks: Boolean = false,
override val clientDetails: ClientDetails? = null,
) :
KustoBaseApiClient(dmUrl, tokenCredential, skipSecurityChecks),
KustoBaseApiClient(dmUrl, tokenCredential, skipSecurityChecks, clientDetails),
IngestClient {

override suspend fun submitIngestion(
database: String,
table: String,
sources: List<com.microsoft.azure.kusto.ingest.v2.source.SourceInfo>,
format: Format,
ingestProperties: IngestRequestProperties?,
): IngestResponse {
val abstractSources = sources.map { it as AbstractSourceInfo }
return submitQueuedIngestion(
database,
table,
abstractSources,
format,
ingestProperties,
)
}

override suspend fun getIngestionStatus(
database: String,
table: String,
operationId: String,
forceDetails: Boolean,
): StatusResponse {
return getIngestionStatusInternal(
database,
table,
operationId,
forceDetails,
)
}

private val defaultConfigurationCache =
DefaultConfigurationCache(
dmUrl = dmUrl,
Expand Down Expand Up @@ -158,16 +191,15 @@ class QueuedIngestionClient(
}

/**
* Gets a summary of the ingestion operation status (lightweight, fast).
* This method provides overall status counters without detailed blob
* information. Use this for quick status checks and polling scenarios.
* Gets detailed information about an ingestion operation.
*
* @param database The target database name
* @param table The target table name
* @param operationId The operation ID returned from the ingestion request
* @return Updated IngestionOperation with status summary
* @param details Whether to retrieve detailed blob-level information
* @return StatusResponse with operation details
*/
private suspend fun getIngestionDetails(
override suspend fun getIngestionDetails(
database: String,
table: String,
operationId: String,
Expand Down Expand Up @@ -279,7 +311,7 @@ class QueuedIngestionClient(
* operation status
* @return Updated IngestionOperation with current status
*/
suspend fun getIngestionStatus(
private suspend fun getIngestionStatusInternal(
database: String,
table: String,
operationId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,24 @@
package com.microsoft.azure.kusto.ingest.v2

import com.azure.core.credential.TokenCredential
import com.microsoft.azure.kusto.ingest.v2.common.ClientDetails
import com.microsoft.azure.kusto.ingest.v2.common.exceptions.IngestException
import com.microsoft.azure.kusto.ingest.v2.infrastructure.HttpResponse
import com.microsoft.azure.kusto.ingest.v2.models.Format
import com.microsoft.azure.kusto.ingest.v2.models.IngestRequestProperties
import com.microsoft.azure.kusto.ingest.v2.models.IngestResponse
import com.microsoft.azure.kusto.ingest.v2.models.StatusResponse
import com.microsoft.azure.kusto.ingest.v2.source.BlobSourceInfo
import com.microsoft.azure.kusto.ingest.v2.source.FileSourceInfo
import com.microsoft.azure.kusto.ingest.v2.source.SourceInfo
import com.microsoft.azure.kusto.ingest.v2.source.StreamSourceInfo
import io.ktor.http.HttpStatusCode
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import java.net.ConnectException
import java.net.URI
import java.util.UUID

@Serializable
private data class StreamFromBlobRequestBody(
Expand All @@ -23,10 +31,116 @@ class StreamingIngestClient(
val engineUrl: String,
override val tokenCredential: TokenCredential,
override val skipSecurityChecks: Boolean = false,
override val clientDetails: ClientDetails? = null,
) :
KustoBaseApiClient(engineUrl, tokenCredential, skipSecurityChecks),
KustoBaseApiClient(engineUrl, tokenCredential, skipSecurityChecks, clientDetails),
IngestClient {

/** Handles multiple source types for streaming ingestion. */
override suspend fun submitIngestion(
database: String,
table: String,
sources: List<SourceInfo>,
format: Format,
ingestProperties: IngestRequestProperties?,
): IngestResponse {
require(sources.isNotEmpty()) { "At least one source is required" }

// Streaming ingestion processes one source at a time
val source = sources.first()
val operationId = UUID.randomUUID().toString()

when (source) {
is BlobSourceInfo -> {
logger.info(
"Streaming ingestion from BlobSource: ${source.blobPath}",
)
submitStreamingIngestion(
database = database,
table = table,
// Not used for blob-based streaming
data = ByteArray(0),
format = format,
ingestProperties = ingestProperties,
blobUrl = source.blobPath,
)
}
is FileSourceInfo -> {
logger.info(
"Streaming ingestion from FileSource: ${source.name}",
)
val data = source.data().readBytes()
submitStreamingIngestion(
database = database,
table = table,
data = data,
format = format,
ingestProperties = ingestProperties,
blobUrl = null,
)
source.close()
}
is StreamSourceInfo -> {
logger.info(
"Streaming ingestion from StreamSource: ${source.name}",
)
val data = source.data().readBytes()
submitStreamingIngestion(
database = database,
table = table,
data = data,
format = format,
ingestProperties = ingestProperties,
blobUrl = null,
)
source.close()
}
else -> {
throw IngestException(
message =
"Unsupported source type for streaming ingestion: ${source::class.simpleName}",
isPermanent = true,
)
}
}

// Streaming ingestion doesn't return an operation ID from the server
// We generate one locally for consistency with the IngestClient interface
return IngestResponse(ingestionOperationId = operationId)
}

/**
* Note: Streaming ingestion doesn't support operation tracking. Throws
* UnsupportedOperationException.
*/
override suspend fun getIngestionStatus(
database: String,
table: String,
operationId: String,
forceDetails: Boolean,
): StatusResponse {
throw UnsupportedOperationException(
"Streaming ingestion does not support operation status tracking. " +
"Operation ID: $operationId cannot be tracked. ",
)
}

/**
* Note: Streaming ingestion doesn't support operation tracking. Throws
* UnsupportedOperationException.
*/
override suspend fun getIngestionDetails(
database: String,
table: String,
operationId: String,
details: Boolean,
): StatusResponse {
throw UnsupportedOperationException(
"Streaming ingestion does not support detailed operation tracking. " +
"Operation ID: $operationId cannot be tracked. ",
)
}

/**
* Submits a streaming ingestion request.
*
Expand Down
Loading
Loading