Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 101 additions & 15 deletions plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,54 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
return -1;
}

/*
* Create upstream context for Kusto Cluster endpoint (for streaming ingestion)
* Convert ingestion endpoint to cluster endpoint by removing "ingest-" prefix
*/
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
flb_sds_t cluster_endpoint = NULL;

/* Check if ingestion endpoint contains "ingest-" prefix */
if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) {
/* Create cluster endpoint by removing "ingest-" prefix */
cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
if (!cluster_endpoint) {
flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
return -1;
}

/* Replace "ingest-" with empty string to get cluster endpoint */
char *ingest_pos = strstr(cluster_endpoint, "ingest-");
if (ingest_pos) {
/* Move the rest of the string to remove "ingest-" */
memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1);
flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7);
}

Comment on lines +950 to +969
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve URL prefix removal for better reliability

The current implementation uses strstr which finds "ingest-" anywhere in the URL, not just in the hostname. This could match unintended parts like path segments. Additionally, the memmove operation assumes the prefix is at the exact position found by strstr.

Consider using a more robust approach that specifically targets the hostname:

-    /* Check if ingestion endpoint contains "ingest-" prefix */
-    if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) {
+    /* Check if hostname starts with "ingest-" prefix */
+    const char *prefix = "ingest-";
+    const char *schema_end = strstr(ctx->ingestion_endpoint, "://");
+    const char *hostname_start = schema_end ? schema_end + 3 : ctx->ingestion_endpoint;
+    
+    if (strncmp(hostname_start, prefix, strlen(prefix)) == 0) {
         /* Create cluster endpoint by removing "ingest-" prefix */
         cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
         if (!cluster_endpoint) {
             flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
             return -1;
         }
 
-        /* Replace "ingest-" with empty string to get cluster endpoint */
-        char *ingest_pos = strstr(cluster_endpoint, "ingest-");
-        if (ingest_pos) {
-            /* Move the rest of the string to remove "ingest-" */
-            memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1);
-            flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7);
-        }
+        /* Find the position in our copy and remove the prefix */
+        char *copy_hostname = strstr(cluster_endpoint, "://");
+        if (copy_hostname) {
+            copy_hostname += 3;
+            memmove(copy_hostname, copy_hostname + strlen(prefix), 
+                    strlen(copy_hostname + strlen(prefix)) + 1);
+            flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - strlen(prefix));
+        }
🤖 Prompt for AI Agents
In plugins/out_azure_kusto/azure_kusto.c around lines 950 to 969, the code uses
strstr on the whole ingestion_endpoint and memmove assuming the found position
is the hostname prefix; instead parse or isolate the URL host portion, verify
the host starts with "ingest-" and then remove only that leading prefix from the
hostname (not from the entire URL). Update the logic to: extract the host
substring (or use a URL parse helper), check if host[0..6] == "ingest-", create
a new SDS for the cluster endpoint by copying the original URL and replacing
only the host portion with host+7 (or build the URL from scheme +
host_without_prefix + path), and adjust SDS length safely with flb_sds_len_set
using the actual removed byte count; ensure bounds checks and null checks are
present and avoid memmove on the whole URL.

flb_plg_info(ctx->ins, "Creating cluster upstream connection to: %s", cluster_endpoint);

/* Create upstream connection to cluster endpoint */
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
flb_sds_destroy(cluster_endpoint);
return -1;
}
Comment on lines +973 to +978
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add cleanup for ingestion upstream on cluster upstream creation failure

If cluster upstream creation fails, the function returns -1 but doesn't clean up the previously created ctx->u upstream connection.

         ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
         if (!ctx->u_cluster) {
             flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
             flb_sds_destroy(cluster_endpoint);
+            flb_upstream_destroy(ctx->u);
+            ctx->u = NULL;
             return -1;
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
flb_sds_destroy(cluster_endpoint);
return -1;
}
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
flb_sds_destroy(cluster_endpoint);
flb_upstream_destroy(ctx->u);
ctx->u = NULL;
return -1;
}
🤖 Prompt for AI Agents
In plugins/out_azure_kusto/azure_kusto.c around lines 973-978, when
flb_upstream_create_url for ctx->u_cluster fails the code currently returns -1
without cleaning up the previously created ctx->u upstream; modify the error
path to call flb_upstream_destroy(ctx->u) (and set ctx->u = NULL) before
destroying cluster_endpoint and returning -1 so the earlier upstream is properly
released; keep the flb_sds_destroy(cluster_endpoint) and the error log as-is.


flb_sds_destroy(cluster_endpoint);
} else {
flb_plg_warn(ctx->ins, "ingestion endpoint does not contain 'ingest-' prefix, using as cluster endpoint");
/* Use ingestion endpoint directly as cluster endpoint */
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed");
return -1;
}
Comment on lines +984 to +988
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add cleanup for ingestion upstream when cluster creation fails (alternate path)

Similar cleanup issue in the alternate code path when the ingestion endpoint doesn't contain "ingest-" prefix.

         ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
         if (!ctx->u_cluster) {
             flb_plg_error(ctx->ins, "cluster upstream creation failed");
+            flb_upstream_destroy(ctx->u);
+            ctx->u = NULL;
             return -1;
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed");
return -1;
}
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed");
flb_upstream_destroy(ctx->u);
ctx->u = NULL;
return -1;
}
🤖 Prompt for AI Agents
In plugins/out_azure_kusto/azure_kusto.c around lines 984-988, when creating the
cluster upstream fails in this alternate path you must clean up the previously
created ingestion upstream to avoid resource leaks; if ctx->u_ingestion is
non-NULL call the appropriate upstream destroy function (e.g.
flb_upstream_destroy(ctx->u_ingestion)), set ctx->u_ingestion to NULL,
optionally log the cleanup, and then return -1.

}

flb_plg_info(ctx->ins, "Cluster upstream connection created successfully for streaming ingestion");
}

Comment on lines +946 to +993
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential memory leak when cluster endpoint creation fails

When flb_sds_create fails (line 956), the function returns -1 without proper cleanup. The upstream connection ctx->u created earlier (line 935) will leak.

Apply this fix to properly clean up on failure:

 if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
     flb_sds_t cluster_endpoint = NULL;
 
     /* Check if ingestion endpoint contains "ingest-" prefix */
     if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) {
         /* Create cluster endpoint by removing "ingest-" prefix */
         cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
         if (!cluster_endpoint) {
             flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
+            flb_upstream_destroy(ctx->u);
+            ctx->u = NULL;
             return -1;
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/*
* Create upstream context for Kusto Cluster endpoint (for streaming ingestion)
* Convert ingestion endpoint to cluster endpoint by removing "ingest-" prefix
*/
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
flb_sds_t cluster_endpoint = NULL;
/* Check if ingestion endpoint contains "ingest-" prefix */
if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) {
/* Create cluster endpoint by removing "ingest-" prefix */
cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
if (!cluster_endpoint) {
flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
return -1;
}
/* Replace "ingest-" with empty string to get cluster endpoint */
char *ingest_pos = strstr(cluster_endpoint, "ingest-");
if (ingest_pos) {
/* Move the rest of the string to remove "ingest-" */
memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1);
flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7);
}
flb_plg_info(ctx->ins, "Creating cluster upstream connection to: %s", cluster_endpoint);
/* Create upstream connection to cluster endpoint */
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
flb_sds_destroy(cluster_endpoint);
return -1;
}
flb_sds_destroy(cluster_endpoint);
} else {
flb_plg_warn(ctx->ins, "ingestion endpoint does not contain 'ingest-' prefix, using as cluster endpoint");
/* Use ingestion endpoint directly as cluster endpoint */
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed");
return -1;
}
}
flb_plg_info(ctx->ins, "Cluster upstream connection created successfully for streaming ingestion");
}
/*
* Create upstream context for Kusto Cluster endpoint (for streaming ingestion)
* Convert ingestion endpoint to cluster endpoint by removing "ingest-" prefix
*/
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
flb_sds_t cluster_endpoint = NULL;
/* Check if ingestion endpoint contains "ingest-" prefix */
if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) {
/* Create cluster endpoint by removing "ingest-" prefix */
cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint);
if (!cluster_endpoint) {
flb_plg_error(ctx->ins, "failed to create cluster endpoint string");
flb_upstream_destroy(ctx->u);
ctx->u = NULL;
return -1;
}
/* Replace "ingest-" with empty string to get cluster endpoint */
char *ingest_pos = strstr(cluster_endpoint, "ingest-");
if (ingest_pos) {
/* Move the rest of the string to remove "ingest-" */
memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1);
flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7);
}
flb_plg_info(ctx->ins, "Creating cluster upstream connection to: %s", cluster_endpoint);
/* Create upstream connection to cluster endpoint */
ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint);
flb_sds_destroy(cluster_endpoint);
return -1;
}
flb_sds_destroy(cluster_endpoint);
} else {
flb_plg_warn(ctx->ins, "ingestion endpoint does not contain 'ingest-' prefix, using as cluster endpoint");
/* Use ingestion endpoint directly as cluster endpoint */
ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "cluster upstream creation failed");
return -1;
}
}
flb_plg_info(ctx->ins, "Cluster upstream connection created successfully for streaming ingestion");
}
🤖 Prompt for AI Agents
In plugins/out_azure_kusto/azure_kusto.c around lines 946-993, the code returns
-1 when flb_sds_create(cluster_endpoint) fails but does not clean up the
previously created upstream (ctx->u), causing a memory/resource leak; modify the
error path to check if ctx->u is non-NULL and properly destroy it (e.g., call
the appropriate flb_upstream_destroy or cleanup function) before returning, and
apply the same cleanup pattern for any other early returns in this block
(including failures creating ctx->u_cluster) so all allocated upstreams are
released on error.

flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base));

/* Create oauth2 context */
Expand Down Expand Up @@ -1396,22 +1444,50 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
}
flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size);

/* Load or refresh ingestion resources */
ret = azure_kusto_load_ingestion_resources(ctx, config);
flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot load ingestion resources");
ret = FLB_RETRY;
goto error;
}
/* Check if streaming ingestion is enabled */
if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion mode enabled for tag: %s", event_chunk->tag);

/* Perform queued ingestion to Kusto */
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL);
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot perform queued ingestion");
ret = FLB_RETRY;
goto error;
/* Check payload size limit for streaming ingestion (4MB) */
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking payload size: %zu bytes against 4MB limit", final_payload_size);
if (final_payload_size > 4194304) { /* 4MB = 4 * 1024 * 1024 */
flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Payload size %zu bytes exceeds 4MB limit for streaming ingestion", final_payload_size);
ret = FLB_ERROR;
goto error;
}
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Payload size check passed (%zu bytes < 4MB)", final_payload_size);

Comment on lines +1447 to +1459
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Validate payload size before compression for streaming ingestion

The 4MB limit check happens after compression, but the error message and documentation suggest the limit applies to the uncompressed payload. This could lead to confusion and unexpected failures.

Clarify the documentation and consider checking the size before compression:

     /* Check if streaming ingestion is enabled */
     if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
         flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion mode enabled for tag: %s", event_chunk->tag);
 
-        /* Check payload size limit for streaming ingestion (4MB) */
-        flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking payload size: %zu bytes against 4MB limit", final_payload_size);
+        /* Check payload size limit for streaming ingestion (4MB after compression if enabled) */
+        flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking %s payload size: %zu bytes against 4MB limit", 
+                     is_compressed ? "compressed" : "uncompressed", final_payload_size);
         if (final_payload_size > 4194304) { /* 4MB = 4 * 1024 * 1024 */
-            flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Payload size %zu bytes exceeds 4MB limit for streaming ingestion", final_payload_size);
+            flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: %s payload size %zu bytes exceeds 4MB limit for streaming ingestion", 
+                          is_compressed ? "Compressed" : "Uncompressed", final_payload_size);
             ret = FLB_ERROR;
             goto error;
         }

Let me verify the streaming ingestion size limit behavior:


🌐 Web query:

Azure Kusto streaming ingestion 4MB size limit compressed or uncompressed

💡 Result:

Short answer: The 4 MB streaming-ingestion limit is on the uncompressed data size. Microsoft docs and guidance treat the 4 MB streaming request limit as uncompressed; if the uncompressed size exceeds 4 MB the service will route to queued ingestion (and some SDKs explicitly state data >4 MB is moved to queued ingestion regardless of compression). [1][2][3][4]

Sources:

  • Azure Data Explorer — Configure streaming ingestion (data size limit: 4 MB). [1]
  • Azure Synapse / Data Explorer blog: “maximum data size per request is 4MB (uncompressed).” [2]
  • ManagedStreamingIngestionClient guidance: “Data that's larger than 4 MB is automatically sent to queued ingestion, regardless of format or compression” and SDK payload note. [3]
  • Ingestion FAQ: batching/data-size is calculated on uncompressed data (compressed uploads are evaluated by uncompressed size). [4]

Validate uncompressed payload size before compression

The 4 MB streaming-ingestion limit applies to the uncompressed data size; current checks against the post-compression size can allow payloads that exceed 4 MB uncompressed to be sent. Update the code to measure and enforce the 4 MB limit on the raw buffer before applying any compression, and adjust log messages accordingly.

     /* Check if streaming ingestion is enabled */
     if (ctx->streaming_ingestion_enabled == FLB_TRUE) {
         flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion mode enabled for tag: %s", event_chunk->tag);

-        /* Check payload size limit for streaming ingestion (4MB) */
-        flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking payload size: %zu bytes against 4MB limit", final_payload_size);
+        /* Check payload size limit for streaming ingestion (4MB uncompressed) */
+        flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking uncompressed payload size: %zu bytes against 4MB limit", uncompressed_size);
         if (uncompressed_size > 4194304) { /* 4MB = 4 * 1024 * 1024 */
-            flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Payload size %zu bytes exceeds 4MB limit for streaming ingestion", final_payload_size);
+            flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Uncompressed payload size %zu bytes exceeds 4MB limit for streaming ingestion", uncompressed_size);
             ret = FLB_ERROR;
             goto error;
         }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In plugins/out_azure_kusto/azure_kusto.c around lines 1447 to 1459, the code
currently enforces the 4 MB streaming ingestion limit on the final
(post-compression) payload; change this to measure and check the
uncompressed/raw buffer size before any compression occurs, so the 4MB check
uses the original buffer length (e.g., raw_buf_size) instead of
final_payload_size, update the log messages to reflect "uncompressed payload
size" when logging and erroring, and only proceed to compression if the raw size
passes the 4MB check (return FLB_ERROR and goto error when raw size > 4194304).

/* Perform streaming ingestion to Kusto */
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Initiating streaming ingestion to Kusto");
ret = azure_kusto_streaming_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size);
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion completed with result: %d", ret);

if (ret != 0) {
flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Streaming ingestion failed, will retry");
ret = FLB_RETRY;
goto error;
} else {
flb_plg_info(ctx->ins, "[FLUSH_STREAMING] SUCCESS: Streaming ingestion completed successfully");
}
} else {
flb_plg_debug(ctx->ins, "[FLUSH_QUEUED] Using queued ingestion mode (streaming ingestion disabled)");
/* Load or refresh ingestion resources for queued ingestion */
ret = azure_kusto_load_ingestion_resources(ctx, config);
flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot load ingestion resources");
ret = FLB_RETRY;
goto error;
}

/* Perform queued ingestion to Kusto */
ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL);
flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret);
if (ret != 0) {
flb_plg_error(ctx->ins, "cannot perform queued ingestion");
ret = FLB_RETRY;
goto error;
}
}

ret = FLB_OK;
Expand Down Expand Up @@ -1501,6 +1577,11 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config)
ctx->u = NULL;
}

if (ctx->u_cluster) {
flb_upstream_destroy(ctx->u_cluster);
ctx->u_cluster = NULL;
}

pthread_mutex_destroy(&ctx->resources_mutex);
pthread_mutex_destroy(&ctx->token_mutex);
pthread_mutex_destroy(&ctx->blob_mutex);
Expand Down Expand Up @@ -1565,6 +1646,11 @@ static struct flb_config_map config_map[] = {
offsetof(struct flb_azure_kusto, compression_enabled),
"Enable HTTP payload compression (gzip)."
"The default is true."},
{FLB_CONFIG_MAP_BOOL, "streaming_ingestion_enabled", "false", 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, streaming_ingestion_enabled),
"Enable streaming ingestion. When enabled, data is sent directly to Kusto engine without using blob storage and queues. "
"Note: Streaming ingestion has a 4MB limit per request and doesn't support buffering."
"The default is false (uses queued ingestion)."},
{FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE,
offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval),
"Set the azure kusto ingestion resources refresh interval"
Expand Down
8 changes: 7 additions & 1 deletion plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ struct flb_azure_kusto {
/* compress payload */
int compression_enabled;

/* streaming ingestion mode */
int streaming_ingestion_enabled;

int ingestion_resources_refresh_interval;

/* records configuration */
Expand Down Expand Up @@ -167,6 +170,9 @@ struct flb_azure_kusto {
/* Upstream connection to the backend server */
struct flb_upstream *u;

/* Upstream connection to the main Kusto cluster for streaming ingestion */
struct flb_upstream *u_cluster;

struct flb_upstream *imds_upstream;

/* Fluent Bit context */
Expand All @@ -179,4 +185,4 @@ struct flb_azure_kusto {
flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx);
flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl);

#endif
#endif
18 changes: 18 additions & 0 deletions plugins/out_azure_kusto/azure_kusto_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,24 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *
return NULL;
}

/* Validate mutual exclusivity between buffering and streaming ingestion */
if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) {
flb_plg_error(ctx->ins, "buffering_enabled and streaming_ingestion_enabled cannot both be true. When buffering is enabled, streaming ingestion is automatically disabled");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}

Comment on lines +799 to +805
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Don’t hard-fail when both buffering and streaming are enabled; resolve deterministically.

Current error contradicts the log (“streaming… automatically disabled”). Either disable buffering or streaming and continue with a warning.

-    if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) {
-        flb_plg_error(ctx->ins, "buffering_enabled and streaming_ingestion_enabled cannot both be true. When buffering is enabled, streaming ingestion is automatically disabled");
-        flb_azure_kusto_conf_destroy(ctx);
-        return NULL;
-    }
+    if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) {
+        /* Prefer queued ingestion when buffering is explicitly enabled */
+        ctx->streaming_ingestion_enabled = FLB_FALSE;
+        flb_plg_warn(ctx->ins, "buffering_enabled=true overrides streaming_ingestion_enabled; streaming disabled");
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/* Validate mutual exclusivity between buffering and streaming ingestion */
if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) {
flb_plg_error(ctx->ins, "buffering_enabled and streaming_ingestion_enabled cannot both be true. When buffering is enabled, streaming ingestion is automatically disabled");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}
/* Validate mutual exclusivity between buffering and streaming ingestion */
if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) {
/* Prefer queued ingestion when buffering is explicitly enabled */
ctx->streaming_ingestion_enabled = FLB_FALSE;
flb_plg_warn(ctx->ins,
"buffering_enabled=true overrides streaming_ingestion_enabled; streaming disabled");
}
🤖 Prompt for AI Agents
In plugins/out_azure_kusto/azure_kusto_conf.c around lines 799-805, the code
currently hard-fails when both buffering_enabled and streaming_ingestion_enabled
are true; change this to deterministically resolve the conflict: replace the
error+destroy+return with a warning log stating streaming ingestion will be
disabled, set ctx->streaming_ingestion_enabled = FLB_FALSE (or 0) so buffering
remains active, and continue initialization without freeing ctx so operation
proceeds with buffering only.

/* Log ingestion mode selection */
if (ctx->streaming_ingestion_enabled) {
flb_plg_info(ctx->ins, "streaming ingestion mode enabled - data will be sent directly to Kusto engine (4MB payload limit per request, no local buffering support)");
} else {
if (ctx->buffering_enabled) {
flb_plg_info(ctx->ins, "queued ingestion mode enabled with local file buffering - data will be sent via blob storage and ingestion queues");
} else {
flb_plg_info(ctx->ins, "queued ingestion mode enabled - data will be sent via blob storage and ingestion queues");
}
}

/* Create oauth2 context */
if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM ||
ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER) {
Expand Down
143 changes: 142 additions & 1 deletion plugins/out_azure_kusto/azure_kusto_ingest.c
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,147 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t
return ret;
}

int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
size_t tag_len, flb_sds_t payload, size_t payload_size)
{
int ret = -1;
struct flb_connection *u_conn;
struct flb_http_client *c;
flb_sds_t uri = NULL;
flb_sds_t token = NULL;
size_t resp_size;
time_t now;
struct tm tm;
char tmp[64];
int len;

flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Starting for tag: %.*s, payload: %zu bytes, db: %s, table: %s, compression: %s",
(int)tag_len, tag, payload_size, ctx->database_name, ctx->table_name, ctx->compression_enabled ? "enabled" : "disabled");

now = time(NULL);
gmtime_r(&now, &tm);
len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm);
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Request timestamp: %s", tmp);

Comment on lines +534 to +541
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Pre-validate the 4 MiB streaming limit (avoid 413s and wasted I/O).

Check payload_size upfront (and compressed size when applicable) before opening a connection.

     flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Starting for tag: %.*s, payload: %zu bytes, db: %s, table: %s, compression: %s",
                  (int)tag_len, tag, payload_size, ctx->database_name, ctx->table_name, ctx->compression_enabled ? "enabled" : "disabled");
+
+    /* Kusto streaming hard limit: 4 MiB per request payload */
+    if (payload_size > (4 * 1024 * 1024)) {
+        flb_plg_error(ctx->ins, "[STREAMING_INGESTION] payload_size=%zu exceeds 4MiB limit", payload_size);
+        return -1;
+    }
🤖 Prompt for AI Agents
In plugins/out_azure_kusto/azure_kusto_ingest.c around lines 534-541, add an
upfront size validation before any network/connection work: check payload_size
against the 4 MiB streaming limit and, if compression is enabled, compute or
obtain the compressed size and validate that too; if the size exceeds the limit,
log an informative error (including tag, db/table, sizes) and abort/return early
with the appropriate error code instead of proceeding to open a connection or
send data.

/* Get upstream connection to the main Kusto cluster endpoint (for streaming ingestion) */
if (!ctx->u_cluster) {
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: cluster upstream not available - streaming ingestion requires cluster endpoint");
return -1;
}

flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Getting upstream connection to cluster endpoint");
u_conn = flb_upstream_conn_get(ctx->u_cluster);
if (!u_conn) {
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to get cluster upstream connection");
return -1;
}
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Successfully obtained upstream connection");

/* Get authentication token */
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Retrieving OAuth2 authentication token");
token = get_azure_kusto_token(ctx);
if (!token) {
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to retrieve OAuth2 token");
flb_upstream_conn_release(u_conn);
return -1;
}
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Successfully obtained OAuth2 token (length: %zu)", flb_sds_len(token));

/* Build the streaming ingestion URI */
uri = flb_sds_create_size(256);
if (!uri) {
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to create URI buffer");
flb_sds_destroy(token);
flb_upstream_conn_release(u_conn);
return -1;
}

/* Create the streaming ingestion URI */
if (ctx->ingestion_mapping_reference) {
flb_sds_snprintf(&uri, flb_sds_alloc(uri),
"/v1/rest/ingest/%s/%s?streamFormat=MultiJSON&mappingName=%s",
ctx->database_name, ctx->table_name, ctx->ingestion_mapping_reference);
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Using ingestion mapping: %s", ctx->ingestion_mapping_reference);
} else {
flb_sds_snprintf(&uri, flb_sds_alloc(uri),
"/v1/rest/ingest/%s/%s?streamFormat=MultiJSON",
ctx->database_name, ctx->table_name);
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] No ingestion mapping specified");
}
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Request URI: %s", uri);

/* Create HTTP client for streaming ingestion */
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Creating HTTP client for POST request");
c = flb_http_client(u_conn, FLB_HTTP_POST, uri, payload, payload_size,
NULL, 0, NULL, 0);

if (c) {
/* Add required headers for streaming ingestion */
flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
flb_http_add_header(c, "Accept", 6, "application/json", 16);
flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token));
flb_http_add_header(c, "x-ms-date", 9, tmp, len);
flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR));
flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16);
flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16);

/* Set Content-Type based on whether compression is enabled */
if (ctx->compression_enabled) {
flb_http_add_header(c, "Content-Type", 12, "application/json", 16);
flb_http_add_header(c, "Content-Encoding", 16, "gzip", 4);
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Headers set for compressed payload");
} else {
flb_http_add_header(c, "Content-Type", 12, "application/json; charset=utf-8", 31);
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Headers set for uncompressed payload");
}

flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Payload sample (first 200 chars): %.200s", (char*)payload);
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Sending HTTP POST request to Kusto engine");

/* Send the HTTP request */
ret = flb_http_do(c, &resp_size);

flb_plg_info(ctx->ins, "[STREAMING_INGESTION] HTTP request completed - http_do result: %d, HTTP Status: %i, Response size: %zu", ret, c->resp.status, resp_size);

if (ret == 0) {
/* Check for successful HTTP status codes */
if (c->resp.status == 200 || c->resp.status == 204) {
ret = 0;
flb_plg_info(ctx->ins, "[STREAMING_INGESTION] SUCCESS: Data successfully ingested to Kusto (HTTP %d)", c->resp.status);
} else {
ret = -1;
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed with status: %i", c->resp.status);

if (c->resp.payload_size > 0) {
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] Error response body (size: %zu): %.*s",
c->resp.payload_size, (int)c->resp.payload_size, c->resp.payload);
}
}
} else {
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed at transport level (ret=%d)", ret);
}

flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Destroying HTTP client");
flb_http_client_destroy(c);
} else {
flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to create HTTP client context");
}

/* Cleanup */
flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Cleaning up resources");
if (uri) {
flb_sds_destroy(uri);
}
if (token) {
flb_sds_destroy(token);
}
flb_upstream_conn_release(u_conn);

flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Streaming ingestion completed with result: %d", ret);
return ret;
}


/* Function to generate a random alphanumeric string */
void generate_random_string(char *str, size_t length)
{
Expand Down Expand Up @@ -658,4 +799,4 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
}

return ret;
}
}
5 changes: 4 additions & 1 deletion plugins/out_azure_kusto/azure_kusto_ingest.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@
int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
size_t tag_len, flb_sds_t payload, size_t payload_size, struct azure_kusto_file *upload_file);

#endif
int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag,
size_t tag_len, flb_sds_t payload, size_t payload_size);

#endif
Loading