@@ -30,34 +30,36 @@ class StreamingIngestClientTest :
3030
3131 private fun testParameters (): Stream <Arguments ?> {
3232 return Stream .of(
33- // Arguments.of(
34- // "Cluster without streaming ingest",
35- // "https://help.kusto.windows.net",
36- // true,
37- // false,
38- // ),
3933 Arguments .of(
40- " Streaming ingest - Regular flow " ,
34+ " Direct ingest - success " ,
4135 engineEndpoint,
42- false ,
43- false ,
44- null , // blobUrl
36+ false , // isException
37+ false , // isUnreachableHost
38+ null , // blobUrl
4539 ),
46- )
47- }
48-
49- private fun blobStreamingTestParameters (): Stream <Arguments ?> {
50- return Stream .of(
5140 Arguments .of(
52- " Direct streaming - backward compatibility " ,
41+ " Blob based ingest - success " ,
5342 engineEndpoint,
54- null , // passing blobUrl = null for direct streaming
43+ false , // isException
44+ false , // isUnreachableHost
45+ publicBlobUrl,
5546 ),
47+ // Blob-based streaming - error case
5648 Arguments .of(
57- " Blob- based streaming - E2E with public blob" ,
49+ " Blob based ingest- Invalid blob URL " ,
5850 engineEndpoint,
59- publicBlobUrl,
51+ true , // isException
52+ false , // isUnreachableHost
53+ " https://nonexistentaccount.blob.core.windows.net/container/file.json" ,
6054 ),
55+ // Uncomment to test cluster without streaming ingest
56+ // Arguments.of(
57+ // "Cluster without streaming ingest",
58+ // "https://help.kusto.windows.net",
59+ // true,
60+ // false,
61+ // null,
62+ // ),
6163 )
6264 }
6365
@@ -74,129 +76,107 @@ class StreamingIngestClientTest :
7476 val client = StreamingIngestClient (cluster, tokenProvider, true )
7577 val ingestProps = IngestRequestProperties (format = targetTestFormat)
7678 if (isException) {
77- val table = " testtable"
78- val data = " col1,col2\n val1,val2" .toByteArray()
79- val exception =
80- assertThrows<IngestException > {
81- client.submitStreamingIngestion(
82- database,
83- table,
84- data,
85- targetTestFormat,
86- ingestProps,
87- )
79+ if (blobUrl != null ) {
80+ logger.info(" Testing error handling for invalid blob URL: {}" , blobUrl)
81+ val exception =
82+ assertThrows<IngestException > {
83+ client.submitStreamingIngestion(
84+ database = database,
85+ table = targetTable,
86+ data = ByteArray (0 ),
87+ format = targetTestFormat,
88+ ingestProperties = ingestProps,
89+ blobUrl = blobUrl,
90+ )
91+ }
92+ assertNotNull(exception, " Exception should not be null for invalid blob URL" )
93+ logger.info(" Expected exception caught for invalid blob URL: {}" , exception.message)
94+ logger.info(" Failure code: {}, isPermanent: {}" , exception.failureCode, exception.isPermanent)
95+
96+ assert (exception.failureCode != 0 ) {
97+ " Expected non-zero failure code for invalid blob URL"
8898 }
89- assertNotNull(exception, " Exception should not be null" )
90- if (isUnreachableHost) {
91- assert (exception.cause is java.net.ConnectException )
92- assert (exception.isPermanent == false )
9399 } else {
94- assert (exception.failureCode == 404 )
95- assert (exception.isPermanent == false )
100+ logger.info(" Testing error handling for direct streaming ingestion" )
101+ val table = " testtable"
102+ val data = " col1,col2\n val1,val2" .toByteArray()
103+ val exception =
104+ assertThrows<IngestException > {
105+ client.submitStreamingIngestion(
106+ database,
107+ table,
108+ data,
109+ targetTestFormat,
110+ ingestProps,
111+ )
112+ }
113+ assertNotNull(exception, " Exception should not be null" )
114+ if (isUnreachableHost) {
115+ assert (exception.cause is java.net.ConnectException )
116+ assert (exception.isPermanent == false )
117+ } else {
118+ assert (exception.failureCode == 404 )
119+ assert (exception.isPermanent == false )
120+ }
96121 }
97122 } else {
98- // Perform streaming ingestion
99- client.submitStreamingIngestion(
100- database = database,
101- table = targetTable,
102- data = randomRow.toByteArray(),
103- format = targetTestFormat,
104- ingestProperties = ingestProps,
105- blobUrl = blobUrl,
106- )
107- // Query the ingested data
108- val results =
109- adminClusterClient
123+ if (blobUrl != null ) {
124+ logger.info(" Blob-based streaming ingestion with URL: {}" , blobUrl)
125+
126+ client.submitStreamingIngestion(
127+ database = database,
128+ table = targetTable,
129+ data = ByteArray (0 ), // Ignored when blobUrl is provided
130+ format = targetTestFormat,
131+ ingestProperties = ingestProps,
132+ blobUrl = blobUrl,
133+ )
134+
135+ logger.info(" Blob-based streaming ingestion submitted successfully" )
136+
137+ kotlinx.coroutines.delay(3000 )
138+ val results = adminClusterClient
110139 .executeQuery(
111140 database,
112- " $targetTable | where deviceId == ' $targetUuid ' | summarize count=count() by deviceId " ,
141+ " $targetTable | summarize count=count()"
113142 )
114143 .primaryResults
115- assertNotNull(results, " Query results should not be null" )
116- results.next()
117- val count: Long = results.getLong(" count" )
118- assertNotNull(count, " Count should not be null" )
119- assert (count == 1L ) {
120- " Expected 1 record for $targetUuid , but got $count "
121- }
122- }
123- }
124-
125- @ParameterizedTest(name = " {0}" )
126- @MethodSource(" blobStreamingTestParameters" )
127- fun `run blob streaming ingest test with various modes` (
128- testName : String ,
129- cluster : String ,
130- blobUrl : String? ,
131- ) = runBlocking {
132- logger.info(" Running blob streaming ingest test: {}" , testName)
133- val client = StreamingIngestClient (cluster, tokenProvider, true )
134- val ingestProps = IngestRequestProperties (format = targetTestFormat)
135-
136- val testUuid = UUID .randomUUID().toString()
137- val testRow = """ {"timestamp": "2023-05-02 15:23:50.0000000","deviceId": "$testUuid ","messageId": "blob-test-message","temperature": 27.5,"humidity": 58.0}"""
138-
139- if (blobUrl != null ) {
140- logger.info(" Blob-based streaming ingestion with URL: {}" , blobUrl)
141-
142- client.submitStreamingIngestion(
143- database = database,
144- table = targetTable,
145- data = ByteArray (0 ), // Ignored when blobUrl is provided
146- format = targetTestFormat,
147- ingestProperties = ingestProps,
148- blobUrl = blobUrl,
149- )
150-
151- logger.info(" Blob-based streaming ingestion submitted successfully" )
152-
153- kotlinx.coroutines.delay(3000 )
154-
155- val results = adminClusterClient
156- .executeQuery(
157- database,
158- " $targetTable | summarize count=count()"
159- )
160- .primaryResults
161-
162- assertNotNull(results, " Query results should not be null" )
163- results.next()
164- val count: Long = results.getLong(" count" )
165- assertNotNull(count, " Count should not be null" )
166- assert (count > 0 ) {
167- " Expected records in table after blob-based streaming ingestion, but got $count "
168- }
169-
170- logger.info(" Blob-based streaming ingestion verified - {} records in table" , count)
171- } else {
172- // Direct streaming ingestion (backward compatibility test)
173- logger.info(" Testing direct streaming" )
174- client.submitStreamingIngestion(
175- database = database,
176- table = targetTable,
177- data = testRow.toByteArray(),
178- format = targetTestFormat,
179- ingestProperties = ingestProps,
180- blobUrl = null ,
181- )
182-
183- // Query the ingested data
184- val results = adminClusterClient
185- .executeQuery(
186- database,
187- " $targetTable | where deviceId == '$testUuid ' | summarize count=count() by deviceId"
144+
145+ assertNotNull(results, " Query results should not be null" )
146+ results.next()
147+ val count: Long = results.getLong(" count" )
148+ assertNotNull(count, " Count should not be null" )
149+ assert (count > 0 ) {
150+ " Expected records in table after blob-based streaming ingestion, but got $count "
151+ }
152+
153+ logger.info(" Blob-based streaming ingestion verified - {} records in table" , count)
154+ } else {
155+ logger.info(" Direct streaming ingestion - success case" )
156+ client.submitStreamingIngestion(
157+ database = database,
158+ table = targetTable,
159+ data = randomRow.toByteArray(),
160+ format = targetTestFormat,
161+ ingestProperties = ingestProps,
162+ blobUrl = null ,
188163 )
189- .primaryResults
190-
191- assertNotNull(results, " Query results should not be null" )
192- results.next()
193- val count: Long = results.getLong(" count" )
194- assertNotNull(count, " Count should not be null" )
195- assert (count == 1L ) {
196- " Expected 1 record for $testUuid in test '$testName ', but got $count "
164+
165+ val results =
166+ adminClusterClient
167+ .executeQuery(
168+ database,
169+ " $targetTable | where deviceId == '$targetUuid ' | summarize count=count() by deviceId" ,
170+ )
171+ .primaryResults
172+ assertNotNull(results, " Query results should not be null" )
173+ results.next()
174+ val count: Long = results.getLong(" count" )
175+ assertNotNull(count, " Count should not be null" )
176+ assert (count == 1L ) {
177+ " Expected 1 record for $targetUuid , but got $count "
178+ }
197179 }
198180 }
199-
200- logger.info(" Blob streaming test '{}' completed successfully" , testName)
201181 }
202182}
0 commit comments