Skip to content

Commit 1ac4a97

Browse files
added bloburl to streaming
1 parent 3a7adb4 commit 1ac4a97

File tree

2 files changed

+151
-18
lines changed

2 files changed

+151
-18
lines changed

ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/StreamingIngestClient.kt

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,19 @@ import com.microsoft.azure.kusto.ingest.v2.infrastructure.HttpResponse
88
import com.microsoft.azure.kusto.ingest.v2.models.Format
99
import com.microsoft.azure.kusto.ingest.v2.models.IngestRequestProperties
1010
import io.ktor.http.HttpStatusCode
11+
import kotlinx.serialization.Serializable
12+
import kotlinx.serialization.SerialName
13+
import kotlinx.serialization.json.Json
1114
import java.net.ConnectException
1215
import java.net.URI
1316

17+
18+
@Serializable
19+
private data class StreamFromBlobRequestBody(
20+
@SerialName("SourceUri")
21+
val sourceUri: String,
22+
)
23+
1424
class StreamingIngestClient(
1525
val engineUrl: String,
1626
override val tokenCredential: TokenCredential,
@@ -27,6 +37,7 @@ class StreamingIngestClient(
2737
* @param data The data to ingest (as ByteArray)
2838
* @param format The data format
2939
* @param ingestProperties Optional ingestion properties
40+
* @param blobUrl Optional blob URL for blob-based streaming ingestion (if provided, data is ignored)
3041
* @return IngestResponse for tracking the request
3142
*/
3243
suspend fun submitStreamingIngestion(
@@ -35,31 +46,55 @@ class StreamingIngestClient(
3546
data: ByteArray,
3647
format: Format = Format.csv,
3748
ingestProperties: IngestRequestProperties? = null,
49+
blobUrl: String? = null,
3850
) {
3951
val host = URI(engineUrl).host
40-
logger.info(
41-
"Submitting streaming ingestion request for database: {}, table: {}, data size: {}. Host {}",
42-
database,
43-
table,
44-
data.size,
45-
host,
46-
)
52+
53+
val bodyContent: Any
54+
val sourceKind: String?
55+
val contentType: String
56+
57+
if (blobUrl != null) {
58+
// Blob-based streaming
59+
val requestBody = StreamFromBlobRequestBody(sourceUri = blobUrl)
60+
bodyContent = Json.encodeToString(requestBody).toByteArray()
61+
sourceKind = "uri"
62+
contentType = "application/json"
63+
logger.info(
64+
"Submitting streaming ingestion from blob for database: {}, table: {}, blob: {}. Host {}",
65+
database,
66+
table,
67+
blobUrl,
68+
host,
69+
)
70+
} else {
71+
// Direct streaming using raw data
72+
bodyContent = data
73+
sourceKind = null
74+
contentType = "application/octet-stream"
75+
logger.info(
76+
"Submitting streaming ingestion request for database: {}, table: {}, data size: {}. Host {}",
77+
database,
78+
table,
79+
data.size,
80+
host,
81+
)
82+
}
83+
4784
try {
4885
val response: HttpResponse<Unit> =
4986
api.postStreamingIngest(
5087
database = database,
5188
table = table,
5289
streamFormat = format,
53-
body = data,
54-
mappingName =
55-
ingestProperties?.ingestionMappingReference,
56-
// TODO: What is sourceKind for streaming ingestion?
57-
sourceKind = null,
90+
body = bodyContent,
91+
mappingName = ingestProperties?.ingestionMappingReference,
92+
sourceKind = sourceKind,
5893
host = host,
5994
acceptEncoding = "gzip",
6095
connection = "Keep-Alive",
6196
contentEncoding = null,
62-
contentType = "application/octet-stream",
97+
contentType = contentType,
6398
)
6499
return handleIngestResponse(
65100
response = response,

ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/StreamingIngestClientTest.kt

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import kotlin.test.assertNotNull
2121
class StreamingIngestClientTest :
2222
IngestV2TestBase(StreamingIngestClientTest::class.java) {
2323

24+
private val publicBlobUrl = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json"
25+
2426
private val targetUuid = UUID.randomUUID().toString()
2527
private val randomRow: String =
2628
"""{"timestamp": "2023-05-02 15:23:50.0000000","deviceId": "$targetUuid","messageId": "7f316225-839a-4593-92b5-1812949279b3","temperature": 31.0301639051317,"humidity": 62.0791099602725}"""
@@ -39,6 +41,22 @@ class StreamingIngestClientTest :
3941
engineEndpoint,
4042
false,
4143
false,
44+
null, // blobUrl
45+
),
46+
)
47+
}
48+
49+
private fun blobStreamingTestParameters(): Stream<Arguments?> {
50+
return Stream.of(
51+
Arguments.of(
52+
"Direct streaming - backward compatibility",
53+
engineEndpoint,
54+
null, // passing blobUrl = null for direct streaming
55+
),
56+
Arguments.of(
57+
"Blob-based streaming - E2E with public blob",
58+
engineEndpoint,
59+
publicBlobUrl,
4260
),
4361
)
4462
}
@@ -50,6 +68,7 @@ class StreamingIngestClientTest :
5068
cluster: String,
5169
isException: Boolean,
5270
isUnreachableHost: Boolean,
71+
blobUrl: String?,
5372
) = runBlocking {
5473
logger.info("Running streaming ingest test {}", testName)
5574
val client = StreamingIngestClient(cluster, tokenProvider, true)
@@ -78,11 +97,12 @@ class StreamingIngestClientTest :
7897
} else {
7998
// Perform streaming ingestion
8099
client.submitStreamingIngestion(
81-
database,
82-
targetTable,
83-
randomRow.toByteArray(),
84-
targetTestFormat,
85-
ingestProps,
100+
database = database,
101+
table = targetTable,
102+
data = randomRow.toByteArray(),
103+
format = targetTestFormat,
104+
ingestProperties = ingestProps,
105+
blobUrl = blobUrl,
86106
)
87107
// Query the ingested data
88108
val results =
@@ -101,4 +121,82 @@ class StreamingIngestClientTest :
101121
}
102122
}
103123
}
124+
125+
@ParameterizedTest(name = "{0}")
126+
@MethodSource("blobStreamingTestParameters")
127+
fun `run blob streaming ingest test with various modes`(
128+
testName: String,
129+
cluster: String,
130+
blobUrl: String?,
131+
) = runBlocking {
132+
logger.info("Running blob streaming ingest test: {}", testName)
133+
val client = StreamingIngestClient(cluster, tokenProvider, true)
134+
val ingestProps = IngestRequestProperties(format = targetTestFormat)
135+
136+
val testUuid = UUID.randomUUID().toString()
137+
val testRow = """{"timestamp": "2023-05-02 15:23:50.0000000","deviceId": "$testUuid","messageId": "blob-test-message","temperature": 27.5,"humidity": 58.0}"""
138+
139+
if (blobUrl != null) {
140+
logger.info("Blob-based streaming ingestion with URL: {}", blobUrl)
141+
142+
client.submitStreamingIngestion(
143+
database = database,
144+
table = targetTable,
145+
data = ByteArray(0), // Ignored when blobUrl is provided
146+
format = targetTestFormat,
147+
ingestProperties = ingestProps,
148+
blobUrl = blobUrl,
149+
)
150+
151+
logger.info("Blob-based streaming ingestion submitted successfully")
152+
153+
kotlinx.coroutines.delay(3000)
154+
155+
val results = adminClusterClient
156+
.executeQuery(
157+
database,
158+
"$targetTable | summarize count=count()"
159+
)
160+
.primaryResults
161+
162+
assertNotNull(results, "Query results should not be null")
163+
results.next()
164+
val count: Long = results.getLong("count")
165+
assertNotNull(count, "Count should not be null")
166+
assert(count > 0) {
167+
"Expected records in table after blob-based streaming ingestion, but got $count"
168+
}
169+
170+
logger.info("Blob-based streaming ingestion verified - {} records in table", count)
171+
} else {
172+
// Direct streaming ingestion (backward compatibility test)
173+
logger.info("Testing direct streaming")
174+
client.submitStreamingIngestion(
175+
database = database,
176+
table = targetTable,
177+
data = testRow.toByteArray(),
178+
format = targetTestFormat,
179+
ingestProperties = ingestProps,
180+
blobUrl = null,
181+
)
182+
183+
// Query the ingested data
184+
val results = adminClusterClient
185+
.executeQuery(
186+
database,
187+
"$targetTable | where deviceId == '$testUuid' | summarize count=count() by deviceId"
188+
)
189+
.primaryResults
190+
191+
assertNotNull(results, "Query results should not be null")
192+
results.next()
193+
val count: Long = results.getLong("count")
194+
assertNotNull(count, "Count should not be null")
195+
assert(count == 1L) {
196+
"Expected 1 record for $testUuid in test '$testName', but got $count"
197+
}
198+
}
199+
200+
logger.info("Blob streaming test '{}' completed successfully", testName)
201+
}
104202
}

0 commit comments

Comments
 (0)