-
Notifications
You must be signed in to change notification settings - Fork 1.8k
out_azure_kusto: added streaming ingestion support #10809
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
7f4efea
32e99e5
10c0a98
5706e9d
6a2579e
bf150d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -943,6 +943,54 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return -1; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* Create upstream context for Kusto Cluster endpoint (for streaming ingestion) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* Convert ingestion endpoint to cluster endpoint by removing "ingest-" prefix | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (ctx->streaming_ingestion_enabled == FLB_TRUE) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_sds_t cluster_endpoint = NULL; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* Check if ingestion endpoint contains "ingest-" prefix */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* Create cluster endpoint by removing "ingest-" prefix */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (!cluster_endpoint) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_error(ctx->ins, "failed to create cluster endpoint string"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return -1; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* Replace "ingest-" with empty string to get cluster endpoint */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
char *ingest_pos = strstr(cluster_endpoint, "ingest-"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (ingest_pos) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* Move the rest of the string to remove "ingest-" */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_info(ctx->ins, "Creating cluster upstream connection to: %s", cluster_endpoint); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* Create upstream connection to cluster endpoint */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (!ctx->u_cluster) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_sds_destroy(cluster_endpoint); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return -1; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+973
to
+978
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add cleanup for ingestion upstream on cluster upstream creation failure If cluster upstream creation fails, the function returns -1 but doesn't clean up the previously created ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
flb_sds_destroy(cluster_endpoint);
+ flb_upstream_destroy(ctx->u);
+ ctx->u = NULL;
return -1;
} 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_sds_destroy(cluster_endpoint); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_warn(ctx->ins, "ingestion endpoint does not contain 'ingest-' prefix, using as cluster endpoint"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* Use ingestion endpoint directly as cluster endpoint */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (!ctx->u_cluster) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_error(ctx->ins, "cluster upstream creation failed"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return -1; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+984
to
+988
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add cleanup for ingestion upstream when cluster creation fails (alternate path) Similar cleanup issue in the alternate code path when the ingestion endpoint doesn't contain "ingest-" prefix. ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed");
+ flb_upstream_destroy(ctx->u);
+ ctx->u = NULL;
return -1;
} 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_info(ctx->ins, "Cluster upstream connection created successfully for streaming ingestion"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+946
to
+993
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix potential memory leak when cluster endpoint creation fails When Apply this fix to properly clean up on failure: if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
flb_sds_t cluster_endpoint = NULL;
/* Check if ingestion endpoint contains "ingest-" prefix */
if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) {
/* Create cluster endpoint by removing "ingest-" prefix */
cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
if (!cluster_endpoint) {
flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
+ flb_upstream_destroy(ctx->u);
+ ctx->u = NULL;
return -1;
} 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* Create oauth2 context */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -1396,22 +1444,50 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* Load or refresh ingestion resources */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ret = azure_kusto_load_ingestion_resources(ctx, config); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (ret != 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_error(ctx->ins, "cannot load ingestion resources"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ret = FLB_RETRY; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
goto error; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* Check if streaming ingestion is enabled */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (ctx->streaming_ingestion_enabled == FLB_TRUE) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion mode enabled for tag: %s", event_chunk->tag); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* Perform queued ingestion to Kusto */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (ret != 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_error(ctx->ins, "cannot perform queued ingestion"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ret = FLB_RETRY; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
goto error; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* Check payload size limit for streaming ingestion (4MB) */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking payload size: %zu bytes against 4MB limit", final_payload_size); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (final_payload_size > 4194304) { /* 4MB = 4 * 1024 * 1024 */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Payload size %zu bytes exceeds 4MB limit for streaming ingestion", final_payload_size); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ret = FLB_ERROR; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
goto error; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Payload size check passed (%zu bytes < 4MB)", final_payload_size); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+1447
to
+1459
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Verification agent 🧩 Analysis chainValidate payload size before compression for streaming ingestion The 4MB limit check happens after compression, but the error message and documentation suggest the limit applies to the uncompressed payload. This could lead to confusion and unexpected failures. Clarify the documentation and consider checking the size before compression: /* Check if streaming ingestion is enabled */
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion mode enabled for tag: %s", event_chunk->tag);
- /* Check payload size limit for streaming ingestion (4MB) */
- flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking payload size: %zu bytes against 4MB limit", final_payload_size);
+ /* Check payload size limit for streaming ingestion (4MB after compression if enabled) */
+ flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking %s payload size: %zu bytes against 4MB limit",
+ is_compressed ? "compressed" : "uncompressed", final_payload_size);
if (final_payload_size > 4194304) { /* 4MB = 4 * 1024 * 1024 */
- flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Payload size %zu bytes exceeds 4MB limit for streaming ingestion", final_payload_size);
+ flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: %s payload size %zu bytes exceeds 4MB limit for streaming ingestion",
+ is_compressed ? "Compressed" : "Uncompressed", final_payload_size);
ret = FLB_ERROR;
goto error;
} Let me verify the streaming ingestion size limit behavior: 🌐 Web query:
💡 Result: Short answer: The 4 MB streaming-ingestion limit is on the uncompressed data size. Microsoft docs and guidance treat the 4 MB streaming request limit as uncompressed; if the uncompressed size exceeds 4 MB the service will route to queued ingestion (and some SDKs explicitly state data >4 MB is moved to queued ingestion regardless of compression). [1][2][3][4] Sources:
Validate uncompressed payload size before compression The 4 MB streaming-ingestion limit applies to the uncompressed data size; current checks against the post-compression size can allow payloads that exceed 4 MB uncompressed to be sent. Update the code to measure and enforce the 4 MB limit on the raw buffer before applying any compression, and adjust log messages accordingly. /* Check if streaming ingestion is enabled */
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion mode enabled for tag: %s", event_chunk->tag);
- /* Check payload size limit for streaming ingestion (4MB) */
- flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking payload size: %zu bytes against 4MB limit", final_payload_size);
+ /* Check payload size limit for streaming ingestion (4MB uncompressed) */
+ flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking uncompressed payload size: %zu bytes against 4MB limit", uncompressed_size);
if (uncompressed_size > 4194304) { /* 4MB = 4 * 1024 * 1024 */
- flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Payload size %zu bytes exceeds 4MB limit for streaming ingestion", final_payload_size);
+ flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Uncompressed payload size %zu bytes exceeds 4MB limit for streaming ingestion", uncompressed_size);
ret = FLB_ERROR;
goto error;
}
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* Perform streaming ingestion to Kusto */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Initiating streaming ingestion to Kusto"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ret = azure_kusto_streaming_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion completed with result: %d", ret); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (ret != 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Streaming ingestion failed, will retry"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ret = FLB_RETRY; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
goto error; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] SUCCESS: Streaming ingestion completed successfully"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_debug(ctx->ins, "[FLUSH_QUEUED] Using queued ingestion mode (streaming ingestion disabled)"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* Load or refresh ingestion resources for queued ingestion */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ret = azure_kusto_load_ingestion_resources(ctx, config); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (ret != 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_error(ctx->ins, "cannot load ingestion resources"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ret = FLB_RETRY; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
goto error; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* Perform queued ingestion to Kusto */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (ret != 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_plg_error(ctx->ins, "cannot perform queued ingestion"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ret = FLB_RETRY; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
goto error; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ret = FLB_OK; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -1501,6 +1577,11 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ctx->u = NULL; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (ctx->u_cluster) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
flb_upstream_destroy(ctx->u_cluster); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ctx->u_cluster = NULL; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pthread_mutex_destroy(&ctx->resources_mutex); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pthread_mutex_destroy(&ctx->token_mutex); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pthread_mutex_destroy(&ctx->blob_mutex); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -1565,6 +1646,11 @@ static struct flb_config_map config_map[] = { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
offsetof(struct flb_azure_kusto, compression_enabled), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"Enable HTTP payload compression (gzip)." | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"The default is true."}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{FLB_CONFIG_MAP_BOOL, "streaming_ingestion_enabled", "false", 0, FLB_TRUE, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
offsetof(struct flb_azure_kusto, streaming_ingestion_enabled), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"Enable streaming ingestion. When enabled, data is sent directly to Kusto engine without using blob storage and queues. " | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"Note: Streaming ingestion has a 4MB limit per request and doesn't support buffering." | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"The default is false (uses queued ingestion)."}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"Set the azure kusto ingestion resources refresh interval" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -796,6 +796,24 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * | |||||||||||||||||||||||||||
return NULL; | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
/* Validate mutual exclusivity between buffering and streaming ingestion */ | ||||||||||||||||||||||||||||
if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) { | ||||||||||||||||||||||||||||
flb_plg_error(ctx->ins, "buffering_enabled and streaming_ingestion_enabled cannot both be true. When buffering is enabled, streaming ingestion is automatically disabled"); | ||||||||||||||||||||||||||||
flb_azure_kusto_conf_destroy(ctx); | ||||||||||||||||||||||||||||
return NULL; | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
Comment on lines
+799
to
+805
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Don’t hard-fail when both buffering and streaming are enabled; resolve deterministically. Current error contradicts the log (“streaming… automatically disabled”). Either disable buffering or streaming and continue with a warning. - if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) {
- flb_plg_error(ctx->ins, "buffering_enabled and streaming_ingestion_enabled cannot both be true. When buffering is enabled, streaming ingestion is automatically disabled");
- flb_azure_kusto_conf_destroy(ctx);
- return NULL;
- }
+ if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) {
+ /* Prefer queued ingestion when buffering is explicitly enabled */
+ ctx->streaming_ingestion_enabled = FLB_FALSE;
+ flb_plg_warn(ctx->ins, "buffering_enabled=true overrides streaming_ingestion_enabled; streaming disabled");
+ } 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||
/* Log ingestion mode selection */ | ||||||||||||||||||||||||||||
if (ctx->streaming_ingestion_enabled) { | ||||||||||||||||||||||||||||
flb_plg_info(ctx->ins, "streaming ingestion mode enabled - data will be sent directly to Kusto engine (4MB payload limit per request, no local buffering support)"); | ||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||
if (ctx->buffering_enabled) { | ||||||||||||||||||||||||||||
flb_plg_info(ctx->ins, "queued ingestion mode enabled with local file buffering - data will be sent via blob storage and ingestion queues"); | ||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||
flb_plg_info(ctx->ins, "queued ingestion mode enabled - data will be sent via blob storage and ingestion queues"); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
/* Create oauth2 context */ | ||||||||||||||||||||||||||||
if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM || | ||||||||||||||||||||||||||||
ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER) { | ||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -517,6 +517,147 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t | |
return ret; | ||
} | ||
|
||
int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, | ||
size_t tag_len, flb_sds_t payload, size_t payload_size) | ||
{ | ||
int ret = -1; | ||
struct flb_connection *u_conn; | ||
struct flb_http_client *c; | ||
flb_sds_t uri = NULL; | ||
flb_sds_t token = NULL; | ||
size_t resp_size; | ||
time_t now; | ||
struct tm tm; | ||
char tmp[64]; | ||
int len; | ||
|
||
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Starting for tag: %.*s, payload: %zu bytes, db: %s, table: %s, compression: %s", | ||
(int)tag_len, tag, payload_size, ctx->database_name, ctx->table_name, ctx->compression_enabled ? "enabled" : "disabled"); | ||
|
||
now = time(NULL); | ||
gmtime_r(&now, &tm); | ||
len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm); | ||
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Request timestamp: %s", tmp); | ||
|
||
Comment on lines
+534
to
+541
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Pre-validate the 4 MiB streaming limit (avoid 413s and wasted I/O). Check payload_size upfront (and compressed size when applicable) before opening a connection. flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Starting for tag: %.*s, payload: %zu bytes, db: %s, table: %s, compression: %s",
(int)tag_len, tag, payload_size, ctx->database_name, ctx->table_name, ctx->compression_enabled ? "enabled" : "disabled");
+
+ /* Kusto streaming hard limit: 4 MiB per request payload */
+ if (payload_size > (4 * 1024 * 1024)) {
+ flb_plg_error(ctx->ins, "[STREAMING_INGESTION] payload_size=%zu exceeds 4MiB limit", payload_size);
+ return -1;
+ } 🤖 Prompt for AI Agents
|
||
/* Get upstream connection to the main Kusto cluster endpoint (for streaming ingestion) */ | ||
if (!ctx->u_cluster) { | ||
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: cluster upstream not available - streaming ingestion requires cluster endpoint"); | ||
return -1; | ||
} | ||
|
||
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Getting upstream connection to cluster endpoint"); | ||
u_conn = flb_upstream_conn_get(ctx->u_cluster); | ||
if (!u_conn) { | ||
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to get cluster upstream connection"); | ||
return -1; | ||
} | ||
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Successfully obtained upstream connection"); | ||
|
||
/* Get authentication token */ | ||
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Retrieving OAuth2 authentication token"); | ||
token = get_azure_kusto_token(ctx); | ||
if (!token) { | ||
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to retrieve OAuth2 token"); | ||
flb_upstream_conn_release(u_conn); | ||
return -1; | ||
} | ||
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Successfully obtained OAuth2 token (length: %zu)", flb_sds_len(token)); | ||
|
||
/* Build the streaming ingestion URI */ | ||
uri = flb_sds_create_size(256); | ||
if (!uri) { | ||
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to create URI buffer"); | ||
flb_sds_destroy(token); | ||
flb_upstream_conn_release(u_conn); | ||
return -1; | ||
} | ||
|
||
/* Create the streaming ingestion URI */ | ||
if (ctx->ingestion_mapping_reference) { | ||
flb_sds_snprintf(&uri, flb_sds_alloc(uri), | ||
"/v1/rest/ingest/%s/%s?streamFormat=MultiJSON&mappingName=%s", | ||
ctx->database_name, ctx->table_name, ctx->ingestion_mapping_reference); | ||
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Using ingestion mapping: %s", ctx->ingestion_mapping_reference); | ||
} else { | ||
flb_sds_snprintf(&uri, flb_sds_alloc(uri), | ||
"/v1/rest/ingest/%s/%s?streamFormat=MultiJSON", | ||
ctx->database_name, ctx->table_name); | ||
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] No ingestion mapping specified"); | ||
} | ||
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Request URI: %s", uri); | ||
|
||
/* Create HTTP client for streaming ingestion */ | ||
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Creating HTTP client for POST request"); | ||
c = flb_http_client(u_conn, FLB_HTTP_POST, uri, payload, payload_size, | ||
NULL, 0, NULL, 0); | ||
|
||
if (c) { | ||
/* Add required headers for streaming ingestion */ | ||
flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); | ||
flb_http_add_header(c, "Accept", 6, "application/json", 16); | ||
flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); | ||
flb_http_add_header(c, "x-ms-date", 9, tmp, len); | ||
flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); | ||
flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); | ||
flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); | ||
|
||
/* Set Content-Type based on whether compression is enabled */ | ||
if (ctx->compression_enabled) { | ||
flb_http_add_header(c, "Content-Type", 12, "application/json", 16); | ||
flb_http_add_header(c, "Content-Encoding", 16, "gzip", 4); | ||
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Headers set for compressed payload"); | ||
} else { | ||
flb_http_add_header(c, "Content-Type", 12, "application/json; charset=utf-8", 31); | ||
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Headers set for uncompressed payload"); | ||
} | ||
|
||
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Payload sample (first 200 chars): %.200s", (char*)payload); | ||
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Sending HTTP POST request to Kusto engine"); | ||
|
||
/* Send the HTTP request */ | ||
ret = flb_http_do(c, &resp_size); | ||
|
||
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] HTTP request completed - http_do result: %d, HTTP Status: %i, Response size: %zu", ret, c->resp.status, resp_size); | ||
|
||
if (ret == 0) { | ||
/* Check for successful HTTP status codes */ | ||
if (c->resp.status == 200 || c->resp.status == 204) { | ||
ret = 0; | ||
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] SUCCESS: Data successfully ingested to Kusto (HTTP %d)", c->resp.status); | ||
} else { | ||
ret = -1; | ||
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed with status: %i", c->resp.status); | ||
|
||
if (c->resp.payload_size > 0) { | ||
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] Error response body (size: %zu): %.*s", | ||
c->resp.payload_size, (int)c->resp.payload_size, c->resp.payload); | ||
} | ||
} | ||
} else { | ||
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed at transport level (ret=%d)", ret); | ||
} | ||
|
||
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Destroying HTTP client"); | ||
flb_http_client_destroy(c); | ||
} else { | ||
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to create HTTP client context"); | ||
} | ||
|
||
/* Cleanup */ | ||
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Cleaning up resources"); | ||
if (uri) { | ||
flb_sds_destroy(uri); | ||
} | ||
if (token) { | ||
flb_sds_destroy(token); | ||
} | ||
flb_upstream_conn_release(u_conn); | ||
|
||
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Streaming ingestion completed with result: %d", ret); | ||
return ret; | ||
} | ||
|
||
|
||
/* Function to generate a random alphanumeric string */ | ||
void generate_random_string(char *str, size_t length) | ||
{ | ||
|
@@ -658,4 +799,4 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, | |
} | ||
|
||
return ret; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve URL prefix removal for better reliability
The current implementation uses
strstr
which finds "ingest-" anywhere in the URL, not just in the hostname. This could match unintended parts like path segments. Additionally, the memmove operation assumes the prefix is at the exact position found by strstr.Consider using a more robust approach that specifically targets the hostname:
🤖 Prompt for AI Agents