Skip to content
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2ad23f4
* Add boilerplate
ag-ramachandran Sep 2, 2025
8a031a1
* Move code forward
ag-ramachandran Sep 8, 2025
6e6998c
Add IngestionSource Blob type and QueuedIngest
tanmaya-panda1 Sep 13, 2025
debcb10
removed unnecessary methods
tanmaya-panda1 Sep 13, 2025
b60ea4f
* Reformat code
ag-ramachandran Sep 27, 2025
a548559
* Update POM
ag-ramachandran Sep 27, 2025
fb68d1c
* Remove gitignore (redundant)
ag-ramachandran Sep 27, 2025
e0fc2c8
* Add a todo for fixing
ag-ramachandran Sep 27, 2025
7dfd173
* Add some more tests
ag-ramachandran Sep 29, 2025
c261be7
* WIP with failing tests
ag-ramachandran Oct 3, 2025
c2b83ed
* WIP with failing tests
ag-ramachandran Oct 3, 2025
9db6537
* WIP with failing tests
ag-ramachandran Oct 3, 2025
be86fe3
* WIP add additional tests
ag-ramachandran Oct 5, 2025
d7819c5
* WIP: Fix review comments
ag-ramachandran Oct 5, 2025
df44c94
* WIP: Fix review comments
ag-ramachandran Oct 5, 2025
3d51aa4
* Add additional tests
ag-ramachandran Oct 6, 2025
2a1c55c
* Add support for streaming ingest
ag-ramachandran Oct 6, 2025
b614009
fixed streaming ingestion binary kusto format and added happy flow te…
tanmaya-panda1 Oct 7, 2025
ea95002
* Format changes
ag-ramachandran Oct 7, 2025
a8fa935
* Refactor tests
ag-ramachandran Oct 9, 2025
fc24849
* Add tests for ingestion mapping inline
ag-ramachandran Oct 13, 2025
5eebd47
* Refactor for inline mapping
ag-ramachandran Oct 13, 2025
3bf0e6d
added bloburl to streaming
tanmaya-panda1 Oct 16, 2025
94e92f1
refactored Streaming ingestion tests
tanmaya-panda1 Oct 16, 2025
1b946e0
* rebase changes
ag-ramachandran Oct 31, 2025
ffe3cda
* rebase changes
ag-ramachandran Oct 31, 2025
2de495c
* Add TODO for OneApiException scenario
ag-ramachandran Oct 31, 2025
94eb3a6
* Add TODO for OneApiException scenario
ag-ramachandran Oct 31, 2025
abc6df8
* Fix review comments
ag-ramachandran Oct 31, 2025
7a9e8e2
*Address some more review comments
ag-ramachandran Oct 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 64 additions & 6 deletions ingest-v2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<description>ingest-v2</description>
<properties>
<kotlin.code.style>official</kotlin.code.style>
<kotlin.version>2.2.20</kotlin.version>
<kotlin.version>2.2.10</kotlin.version>
<ktor.version>3.3.0</ktor.version>
<ktor.async.api.version>3.1.1</ktor.async.api.version>
<kotlinx.coroutines.debug.version>1.10.2</kotlinx.coroutines.debug.version>
Expand All @@ -17,8 +17,10 @@
<openapi.generator.version>7.15.0</openapi.generator.version>
<slf4j.version>2.0.9</slf4j.version>
<spotless.version>2.46.1</spotless.version>
<spotless.version>2.46.1</spotless.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
<main.class>io.ktor.server.netty.EngineMain</main.class>
</properties>
<parent>
<artifactId>kusto-client</artifactId>
Expand Down Expand Up @@ -47,6 +49,11 @@
<artifactId>ktor-client-java-jvm</artifactId>
<version>${ktor.version}</version>
</dependency>
<dependency>
<groupId>io.ktor</groupId>
<artifactId>ktor-serialization-jackson</artifactId>
<version>${ktor.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
Expand All @@ -68,16 +75,22 @@
<version>${kotlinx.coroutines.debug.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.mockk</groupId>
<artifactId>mockk-jvm</artifactId>
<version>1.14.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit.version}</version>
<groupId>${project.groupId}</groupId>
<artifactId>kusto-data</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand All @@ -89,10 +102,11 @@
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>

<plugins>
<plugin>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
<groupId>org.jetbrains.kotlin</groupId>
<version>${kotlin.version}</version>
<executions>
<execution>
Expand Down Expand Up @@ -120,7 +134,6 @@
<compilerPlugins>
<plugin>kotlinx-serialization</plugin>
</compilerPlugins>
<jvmTarget>1.8</jvmTarget>
</configuration>
<dependencies>
<dependency>
Expand All @@ -130,6 +143,25 @@
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-serialization</artifactId>
<version>${kotlin.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.openapitools</groupId>
<artifactId>openapi-generator-maven-plugin</artifactId>
Expand Down Expand Up @@ -168,6 +200,25 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.6.1</version>
<executions>
<execution>
<id>add-openapi-generated-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-sources/openapi/src/main/kotlin</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
Expand Down Expand Up @@ -209,6 +260,13 @@
</licenseHeader>
</kotlin>
</configuration>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-serialization</artifactId>
<version>${kotlin.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package com.microsoft.azure.kusto.ingest.v2

import com.azure.core.credential.TokenCredential
import com.microsoft.azure.kusto.ingest.v2.common.exceptions.IngestException
import com.microsoft.azure.kusto.ingest.v2.infrastructure.HttpResponse
import com.microsoft.azure.kusto.ingest.v2.models.ConfigurationResponse
import io.ktor.http.HttpStatusCode
import org.slf4j.LoggerFactory
import java.net.ConnectException

class ConfigurationClient(
override val dmUrl: String,
override val tokenCredential: TokenCredential,
override val skipSecurityChecks: Boolean = false,
) : KustoBaseApiClient(dmUrl, tokenCredential, skipSecurityChecks) {
private val logger =
LoggerFactory.getLogger(ConfigurationClient::class.java)
private val baseUrl = "$dmUrl/v1/rest/ingestion/configuration"

suspend fun getConfigurationDetails(): ConfigurationResponse {
try {
val configurationHttpResponse: HttpResponse<ConfigurationResponse> =
api.getIngestConfiguration()
if (configurationHttpResponse.success) {
logger.info(
"Successfully retrieved configuration details from $dmUrl with status: ${configurationHttpResponse.status}",
)
logger.debug(
"Configuration details: {}",
configurationHttpResponse.body(),
)
return configurationHttpResponse.body()
} else if (
configurationHttpResponse.status ==
HttpStatusCode.NotFound.value
) {
/*
404 is a special case - it indicates that the endpoint is not found. This may be a transient
network issue
*/
val message =
"Endpoint $dmUrl not found. Please ensure the cluster supports queued ingestion."
logger.error(
"{}. Status: {}",
message,
configurationHttpResponse.status,
)
throw IngestException(
message = message,
cause = ConnectException(message),
failureCode = configurationHttpResponse.status,
failureSubCode = "",
isPermanent = false,
)
} else {
val configurationResponseBody = configurationHttpResponse.body()
val message =
"Failed to retrieve configuration details from $baseUrl.Status: ${configurationHttpResponse.status}, " +
"Body: $configurationResponseBody"
logger.error("{}", message)
throw IngestException(
message = message,
failureCode = configurationHttpResponse.status,
)
}
} catch (notAbleToReachHost: ConnectException) {
val message =
"Failed to reach $baseUrl. Please ensure the cluster address is correct and the cluster is reachable."
throw IngestException(
message = message,
cause = notAbleToReachHost,
failureCode = HttpStatusCode.NotFound.value,
failureSubCode = "",
isPermanent = false,
)
} catch (ex: Exception) {
if (ex is IngestException) throw ex
val message =
"An unexpected error occurred while trying to reach $baseUrl"
throw IngestException(
message = message,
cause = ex,
// Mark this as a 5xx series error
failureCode = HttpStatusCode.InternalServerError.value,
failureSubCode = "",
isPermanent = true,
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package com.microsoft.azure.kusto.ingest.v2

import com.microsoft.azure.kusto.ingest.v2.common.exceptions.IngestException
import com.microsoft.azure.kusto.ingest.v2.infrastructure.HttpResponse
import com.microsoft.azure.kusto.ingest.v2.models.IngestResponse
import io.ktor.http.HttpStatusCode
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.net.ConnectException

interface IngestClient {
val logger: Logger
get() = LoggerFactory.getLogger(IngestClient::class.java)

// Common way to parse ingestion response for both Streaming and Queued ingestion

suspend fun <T : Any> handleIngestResponse(
response: HttpResponse<T>,
database: String,
table: String,
dmUrl: String,
endpointType: String,
): T {
if (response.success) {
val ingestResponseBody = response.body()
return ingestResponseBody
} else {
if (response.status == HttpStatusCode.NotFound.value) {
val message =
"Endpoint $dmUrl not found. Please ensure the cluster supports $endpointType ingestion."
logger.error(
"$endpointType ingestion endpoint not found. Please ensure that the target cluster supports $endpointType ingestion and that the endpoint URL is correct.",
)
throw IngestException(
message = message,
cause = ConnectException(message),
failureCode = response.status,
failureSubCode = "",
isPermanent = false,
)
}
val nonSuccessResponseBody: T = response.body()
val ingestResponseOperationId =
if (nonSuccessResponseBody is IngestResponse) {
if (
(nonSuccessResponseBody as IngestResponse)
.ingestionOperationId != null
) {
logger.info(
"Ingestion Operation ID: ${(nonSuccessResponseBody as IngestResponse).ingestionOperationId}",
)
nonSuccessResponseBody.ingestionOperationId
} else {
"N/A"
}
} else {
"N/A"
}
val errorMessage =
"Failed to submit $endpointType ingestion to $database.$table. " +
"Status: ${response.status}, Body: $nonSuccessResponseBody. " +
"OperationId $ingestResponseOperationId"
logger.error(
"$endpointType ingestion failed with response: {}",
errorMessage,
)
throw IngestException(
message = errorMessage,
cause = RuntimeException(errorMessage),
isPermanent = true,
)
}
}
}
Loading
Loading