From 7f4efea0054370610763214df678ba13fafadcb0 Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Thu, 28 Aug 2025 16:31:53 +0530 Subject: [PATCH 1/5] out_azure_kusto:added streaming ingestion support Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto.c | 63 ++++++-- plugins/out_azure_kusto/azure_kusto.h | 5 +- plugins/out_azure_kusto/azure_kusto_ingest.c | 159 ++++++++++++++++++- plugins/out_azure_kusto/azure_kusto_ingest.h | 5 +- 4 files changed, 214 insertions(+), 18 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index ccc611112d6..dfca659c955 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -1396,22 +1396,50 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, } flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size); - /* Load or refresh ingestion resources */ - ret = azure_kusto_load_ingestion_resources(ctx, config); - flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret); - if (ret != 0) { - flb_plg_error(ctx->ins, "cannot load ingestion resources"); - ret = FLB_RETRY; - goto error; - } + /* Check if streaming ingestion is enabled */ + if (ctx->streaming_ingestion_enabled == FLB_TRUE) { + flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion mode enabled for tag: %s", event_chunk->tag); + + /* Check payload size limit for streaming ingestion (4MB) */ + flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking payload size: %zu bytes against 4MB limit", final_payload_size); + if (final_payload_size > 4194304) { /* 4MB = 4 * 1024 * 1024 */ + flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Payload size %zu bytes exceeds 4MB limit for streaming ingestion", final_payload_size); + ret = FLB_ERROR; + goto error; + } + flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Payload size check passed (%zu bytes < 4MB)", final_payload_size); - /* Perform queued ingestion to Kusto */ - ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL); - flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); - if (ret != 0) { - flb_plg_error(ctx->ins, "cannot perform queued ingestion"); - ret = FLB_RETRY; - goto error; + /* Perform streaming ingestion to Kusto */ + flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Initiating streaming ingestion to Kusto"); + ret = azure_kusto_streaming_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size); + flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion completed with result: %d", ret); + + if (ret != 0) { + flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Streaming ingestion failed, will retry"); + ret = FLB_RETRY; + goto error; + } else { + flb_plg_info(ctx->ins, "[FLUSH_STREAMING] SUCCESS: Streaming ingestion completed successfully"); + } + } else { + flb_plg_debug(ctx->ins, "[FLUSH_QUEUED] Using queued ingestion mode (streaming ingestion disabled)"); + /* Load or refresh ingestion resources for queued ingestion */ + ret = azure_kusto_load_ingestion_resources(ctx, config); + flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot load ingestion resources"); + ret = FLB_RETRY; + goto error; + } + + /* Perform queued ingestion to Kusto */ + ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL); + flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot perform queued ingestion"); + ret = FLB_RETRY; + goto error; + } } ret = FLB_OK; @@ -1565,6 +1593,11 @@ static struct flb_config_map config_map[] = { offsetof(struct flb_azure_kusto, compression_enabled), "Enable HTTP payload compression (gzip)." "The default is true."}, + {FLB_CONFIG_MAP_BOOL, "streaming_ingestion_enabled", "false", 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, streaming_ingestion_enabled), + "Enable streaming ingestion. When enabled, data is sent directly to Kusto engine without using blob storage and queues. " + "Note: Streaming ingestion has a 4MB limit per request and doesn't support buffering." + "The default is false (uses queued ingestion)."}, {FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE, offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval), "Set the azure kusto ingestion resources refresh interval" diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index 362b1379533..4478882b7a3 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -108,6 +108,9 @@ struct flb_azure_kusto { /* compress payload */ int compression_enabled; + /* streaming ingestion mode */ + int streaming_ingestion_enabled; + int ingestion_resources_refresh_interval; /* records configuration */ @@ -179,4 +182,4 @@ struct flb_azure_kusto { flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx); flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl); -#endif \ No newline at end of file +#endif diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 38d2aa076e5..682d7e710ad 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -517,6 +517,163 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t return ret; } +int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, + size_t tag_len, flb_sds_t payload, size_t payload_size) +{ + int ret = -1; + struct flb_connection *u_conn; + struct flb_http_client *c; + flb_sds_t uri = NULL; + flb_sds_t token = NULL; + size_t resp_size; + time_t now; + struct tm tm; + char tmp[64]; + int len; + + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Starting streaming ingestion for tag: %.*s", (int)tag_len, tag); + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Payload size: %zu bytes", payload_size); + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Target database: %s, table: %s", + ctx->database_name ? ctx->database_name : "NULL", + ctx->table_name ? ctx->table_name : "NULL"); + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Compression enabled: %s", + ctx->compression_enabled ? "true" : "false"); + + now = time(NULL); + gmtime_r(&now, &tm); + len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm); + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Request timestamp: %s", tmp); + + /* Get upstream connection to the main Kusto engine (not ingestion endpoint) */ + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Getting upstream connection to endpoint: %s", + ctx->ingestion_endpoint ? ctx->ingestion_endpoint : "NULL"); + u_conn = flb_upstream_conn_get(ctx->u); + if (!u_conn) { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: Failed to get upstream connection"); + return -1; + } + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Successfully obtained upstream connection"); + + /* Get authentication token */ + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Retrieving OAuth2 authentication token"); + token = get_azure_kusto_token(ctx); + if (!token) { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: Failed to retrieve OAuth2 token"); + flb_upstream_conn_release(u_conn); + return -1; + } + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Successfully obtained OAuth2 token (length: %zu)", + flb_sds_len(token)); + + /* Build the streaming ingestion URI: /v1/rest/ingest/{database}/{table}?streamFormat=MultiJSON */ + uri = flb_sds_create_size(256); + if (!uri) { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: Failed to create URI buffer"); + flb_sds_destroy(token); + flb_upstream_conn_release(u_conn); + return -1; + } + + /* Create the streaming ingestion URI */ + if (ctx->ingestion_mapping_reference) { + flb_sds_snprintf(&uri, flb_sds_alloc(uri), + "/v1/rest/ingest/%s/%s?streamFormat=MultiJSON&mappingName=%s", + ctx->database_name, ctx->table_name, ctx->ingestion_mapping_reference); + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Using ingestion mapping: %s", + ctx->ingestion_mapping_reference); + } else { + flb_sds_snprintf(&uri, flb_sds_alloc(uri), + "/v1/rest/ingest/%s/%s?streamFormat=MultiJSON", + ctx->database_name, ctx->table_name); + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] No ingestion mapping specified"); + } + + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Request URI: %s", uri); + + /* Create HTTP client for streaming ingestion */ + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Creating HTTP client for POST request"); + c = flb_http_client(u_conn, FLB_HTTP_POST, uri, payload, payload_size, + NULL, 0, NULL, 0); + + if (c) { + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Adding HTTP headers"); + + /* Add required headers for streaming ingestion */ + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + flb_http_add_header(c, "Accept", 6, "application/json", 16); + flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); + flb_http_add_header(c, "x-ms-date", 9, tmp, len); + flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); + flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); + flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); + + /* Set Content-Type based on whether compression is enabled */ + if (ctx->compression_enabled) { + flb_http_add_header(c, "Content-Type", 12, "application/json", 16); + flb_http_add_header(c, "Content-Encoding", 16, "gzip", 4); + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Headers set for compressed payload"); + } else { + flb_http_add_header(c, "Content-Type", 12, "application/json; charset=utf-8", 31); + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Headers set for uncompressed payload"); + } + + /* Log payload sample for debugging (first 200 characters) */ + if (payload_size > 0) { + size_t sample_len = payload_size > 200 ? 200 : payload_size; + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Payload sample (first %zu chars): %.*s%s", + sample_len, (int)sample_len, (char*)payload, + payload_size > 200 ? "..." : ""); + } + + /* Send the HTTP request */ + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Sending HTTP POST request to Kusto engine"); + ret = flb_http_do(c, &resp_size); + + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] HTTP request completed - http_do result: %i, HTTP Status: %i, Response size: %zu", + ret, c->resp.status, resp_size); + + if (ret == 0) { + /* Check for successful HTTP status codes */ + if (c->resp.status == 200 || c->resp.status == 204) { + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] SUCCESS: Streaming ingestion completed successfully for tag: %.*s", + (int)tag_len, tag); + ret = 0; + } else { + ret = -1; + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed with status: %i", + c->resp.status); + + if (c->resp.payload_size > 0) { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] Error response body (size: %zu): %.*s", + c->resp.payload_size, (int)c->resp.payload_size, c->resp.payload); + } else { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] No error response body received"); + } + } + } else { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed at transport level (ret: %i)", ret); + } + + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Destroying HTTP client"); + flb_http_client_destroy(c); + } else { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: Failed to create HTTP client context"); + } + + /* Cleanup */ + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Cleaning up resources"); + if (uri) { + flb_sds_destroy(uri); + } + if (token) { + flb_sds_destroy(token); + } + flb_upstream_conn_release(u_conn); + + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Streaming ingestion completed with result: %i", ret); + return ret; +} + /* Function to generate a random alphanumeric string */ void generate_random_string(char *str, size_t length) { @@ -658,4 +815,4 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, } return ret; -} \ No newline at end of file +} diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.h b/plugins/out_azure_kusto/azure_kusto_ingest.h index b60796f4fd1..80a95105937 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.h +++ b/plugins/out_azure_kusto/azure_kusto_ingest.h @@ -26,4 +26,7 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, size_t tag_len, flb_sds_t payload, size_t payload_size, struct azure_kusto_file *upload_file); -#endif \ No newline at end of file +int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, + size_t tag_len, flb_sds_t payload, size_t payload_size); + +#endif From 32e99e5e7044912c5a562e9b9cab5aff749d6823 Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Mon, 1 Sep 2025 10:43:22 +0530 Subject: [PATCH 2/5] modified URI --- plugins/out_azure_kusto/azure_kusto_ingest.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 682d7e710ad..8e4e3cd2008 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -574,17 +574,17 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, return -1; } - /* Create the streaming ingestion URI */ + /* Create the streaming ingestion URI - table name goes in headers, not URL path */ if (ctx->ingestion_mapping_reference) { flb_sds_snprintf(&uri, flb_sds_alloc(uri), - "/v1/rest/ingest/%s/%s?streamFormat=MultiJSON&mappingName=%s", - ctx->database_name, ctx->table_name, ctx->ingestion_mapping_reference); + "/v1/rest/ingest/%s?streamFormat=MultiJSON&mappingName=%s", + ctx->database_name, ctx->ingestion_mapping_reference); flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Using ingestion mapping: %s", ctx->ingestion_mapping_reference); } else { flb_sds_snprintf(&uri, flb_sds_alloc(uri), - "/v1/rest/ingest/%s/%s?streamFormat=MultiJSON", - ctx->database_name, ctx->table_name); + "/v1/rest/ingest/%s?streamFormat=MultiJSON", + ctx->database_name); flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] No ingestion mapping specified"); } @@ -606,6 +606,10 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); + + /* Add table name as header (required for streaming ingestion) */ + flb_http_add_header(c, "Table", 5, ctx->table_name, flb_sds_len(ctx->table_name)); + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Added Table header: %s", ctx->table_name); /* Set Content-Type based on whether compression is enabled */ if (ctx->compression_enabled) { From 10c0a98a5897cd542724ac519d1c2266302fa0bf Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Mon, 1 Sep 2025 12:12:16 +0530 Subject: [PATCH 3/5] modified the URL --- plugins/out_azure_kusto/azure_kusto_ingest.c | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 8e4e3cd2008..e6cb11d2107 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -574,17 +574,17 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, return -1; } - /* Create the streaming ingestion URI - table name goes in headers, not URL path */ + /* Create the streaming ingestion URI - both database and table are required in URL path per ADX API spec */ if (ctx->ingestion_mapping_reference) { flb_sds_snprintf(&uri, flb_sds_alloc(uri), - "/v1/rest/ingest/%s?streamFormat=MultiJSON&mappingName=%s", - ctx->database_name, ctx->ingestion_mapping_reference); + "/v1/rest/ingest/%s/%s?streamFormat=MultiJSON&mappingName=%s", + ctx->database_name, ctx->table_name, ctx->ingestion_mapping_reference); flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Using ingestion mapping: %s", ctx->ingestion_mapping_reference); } else { flb_sds_snprintf(&uri, flb_sds_alloc(uri), - "/v1/rest/ingest/%s?streamFormat=MultiJSON", - ctx->database_name); + "/v1/rest/ingest/%s/%s?streamFormat=MultiJSON", + ctx->database_name, ctx->table_name); flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] No ingestion mapping specified"); } @@ -607,9 +607,7 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); - /* Add table name as header (required for streaming ingestion) */ - flb_http_add_header(c, "Table", 5, ctx->table_name, flb_sds_len(ctx->table_name)); - flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Added Table header: %s", ctx->table_name); + /* Note: Table name is now included in URL path, no need for Table header */ /* Set Content-Type based on whether compression is enabled */ if (ctx->compression_enabled) { From 5706e9dec381f20fc2f7bf01267d2f6dbe92cdd8 Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Mon, 1 Sep 2025 12:47:40 +0530 Subject: [PATCH 4/5] modified endpoint --- plugins/out_azure_kusto/azure_kusto.c | 53 ++++++++++++++++++++ plugins/out_azure_kusto/azure_kusto.h | 3 ++ plugins/out_azure_kusto/azure_kusto_ingest.c | 14 ++++-- 3 files changed, 65 insertions(+), 5 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index dfca659c955..322462618ee 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -943,6 +943,54 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi return -1; } + /* + * Create upstream context for Kusto Cluster endpoint (for streaming ingestion) + * Convert ingestion endpoint to cluster endpoint by removing "ingest-" prefix + */ + if (ctx->streaming_ingestion_enabled == FLB_TRUE) { + flb_sds_t cluster_endpoint = NULL; + + /* Check if ingestion endpoint contains "ingest-" prefix */ + if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) { + /* Create cluster endpoint by removing "ingest-" prefix */ + cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint); + if (!cluster_endpoint) { + flb_plg_error(ctx->ins, "failed to create cluster endpoint string"); + return -1; + } + + /* Replace "ingest-" with empty string to get cluster endpoint */ + char *ingest_pos = strstr(cluster_endpoint, "ingest-"); + if (ingest_pos) { + /* Move the rest of the string to remove "ingest-" */ + memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1); + flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7); + } + + flb_plg_info(ctx->ins, "Creating cluster upstream connection to: %s", cluster_endpoint); + + /* Create upstream connection to cluster endpoint */ + ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls); + if (!ctx->u_cluster) { + flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint); + flb_sds_destroy(cluster_endpoint); + return -1; + } + + flb_sds_destroy(cluster_endpoint); + } else { + flb_plg_warn(ctx->ins, "ingestion endpoint does not contain 'ingest-' prefix, using as cluster endpoint"); + /* Use ingestion endpoint directly as cluster endpoint */ + ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls); + if (!ctx->u_cluster) { + flb_plg_error(ctx->ins, "cluster upstream creation failed"); + return -1; + } + } + + flb_plg_info(ctx->ins, "Cluster upstream connection created successfully for streaming ingestion"); + } + flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base)); /* Create oauth2 context */ @@ -1529,6 +1577,11 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config) ctx->u = NULL; } + if (ctx->u_cluster) { + flb_upstream_destroy(ctx->u_cluster); + ctx->u_cluster = NULL; + } + pthread_mutex_destroy(&ctx->resources_mutex); pthread_mutex_destroy(&ctx->token_mutex); pthread_mutex_destroy(&ctx->blob_mutex); diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index 4478882b7a3..6b53eb23c4f 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -170,6 +170,9 @@ struct flb_azure_kusto { /* Upstream connection to the backend server */ struct flb_upstream *u; + /* Upstream connection to the main Kusto cluster for streaming ingestion */ + struct flb_upstream *u_cluster; + struct flb_upstream *imds_upstream; /* Fluent Bit context */ diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index e6cb11d2107..11a206aba81 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -544,12 +544,16 @@ int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm); flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Request timestamp: %s", tmp); - /* Get upstream connection to the main Kusto engine (not ingestion endpoint) */ - flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Getting upstream connection to endpoint: %s", - ctx->ingestion_endpoint ? ctx->ingestion_endpoint : "NULL"); - u_conn = flb_upstream_conn_get(ctx->u); + /* Get upstream connection to the main Kusto cluster endpoint (for streaming ingestion) */ + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Getting upstream connection to cluster endpoint"); + if (!ctx->u_cluster) { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: Cluster upstream not available - streaming ingestion requires cluster endpoint"); + return -1; + } + + u_conn = flb_upstream_conn_get(ctx->u_cluster); if (!u_conn) { - flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: Failed to get upstream connection"); + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: Failed to get cluster upstream connection"); return -1; } flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Successfully obtained upstream connection"); From 6a2579e091080f1bf133e994cfac1b06759fe226 Mon Sep 17 00:00:00 2001 From: Tanmaya Panda Date: Thu, 28 Aug 2025 16:31:53 +0530 Subject: [PATCH 5/5] out_azure_kusto:added streaming ingestion support Signed-off-by: Tanmaya Panda --- plugins/out_azure_kusto/azure_kusto.c | 100 +++++++++++-- plugins/out_azure_kusto/azure_kusto.h | 8 +- plugins/out_azure_kusto/azure_kusto_conf.c | 18 +++ plugins/out_azure_kusto/azure_kusto_ingest.c | 143 ++++++++++++++++++- plugins/out_azure_kusto/azure_kusto_ingest.h | 5 +- 5 files changed, 256 insertions(+), 18 deletions(-) diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index ccc611112d6..b2ad587b7c1 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -943,6 +943,54 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi return -1; } + /* + * Create upstream context for Kusto Cluster endpoint (for streaming ingestion) + * Convert ingestion endpoint to cluster endpoint by removing "ingest-" prefix + */ + if (ctx->streaming_ingestion_enabled == FLB_TRUE) { + flb_sds_t cluster_endpoint = NULL; + + /* Check if ingestion endpoint contains "ingest-" prefix */ + if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) { + /* Create cluster endpoint by removing "ingest-" prefix */ + cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint); + if (!cluster_endpoint) { + flb_plg_error(ctx->ins, "failed to create cluster endpoint string"); + return -1; + } + + /* Replace "ingest-" with empty string to get cluster endpoint */ + char *ingest_pos = strstr(cluster_endpoint, "ingest-"); + if (ingest_pos) { + /* Move the rest of the string to remove "ingest-" */ + memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1); + flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7); + } + + flb_plg_info(ctx->ins, "Creating cluster upstream connection to: %s", cluster_endpoint); + + /* Create upstream connection to cluster endpoint */ + ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls); + if (!ctx->u_cluster) { + flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint); + flb_sds_destroy(cluster_endpoint); + return -1; + } + + flb_sds_destroy(cluster_endpoint); + } else { + flb_plg_warn(ctx->ins, "ingestion endpoint does not contain 'ingest-' prefix, using as cluster endpoint"); + /* Use ingestion endpoint directly as cluster endpoint */ + ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls); + if (!ctx->u_cluster) { + flb_plg_error(ctx->ins, "cluster upstream creation failed"); + return -1; + } + } + + flb_plg_info(ctx->ins, "Cluster upstream connection created successfully for streaming ingestion"); + } + flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base)); /* Create oauth2 context */ @@ -1396,22 +1444,34 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, } flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size); - /* Load or refresh ingestion resources */ - ret = azure_kusto_load_ingestion_resources(ctx, config); - flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret); - if (ret != 0) { - flb_plg_error(ctx->ins, "cannot load ingestion resources"); - ret = FLB_RETRY; - goto error; - } + /* Check if streaming ingestion is enabled */ + if (ctx->streaming_ingestion_enabled == FLB_TRUE) { + /* Perform streaming ingestion to Kusto */ + ret = azure_kusto_streaming_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size); + + if (ret != 0) { + flb_plg_error(ctx->ins, "streaming ingestion failed, will retry"); + ret = FLB_RETRY; + goto error; + } + } else { + /* Load or refresh ingestion resources for queued ingestion */ + ret = azure_kusto_load_ingestion_resources(ctx, config); + flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot load ingestion resources"); + ret = FLB_RETRY; + goto error; + } - /* Perform queued ingestion to Kusto */ - ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL); - flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); - if (ret != 0) { - flb_plg_error(ctx->ins, "cannot perform queued ingestion"); - ret = FLB_RETRY; - goto error; + /* Perform queued ingestion to Kusto */ + ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL); + flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot perform queued ingestion"); + ret = FLB_RETRY; + goto error; + } } ret = FLB_OK; @@ -1501,6 +1561,11 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config) ctx->u = NULL; } + if (ctx->u_cluster) { + flb_upstream_destroy(ctx->u_cluster); + ctx->u_cluster = NULL; + } + pthread_mutex_destroy(&ctx->resources_mutex); pthread_mutex_destroy(&ctx->token_mutex); pthread_mutex_destroy(&ctx->blob_mutex); @@ -1565,6 +1630,11 @@ static struct flb_config_map config_map[] = { offsetof(struct flb_azure_kusto, compression_enabled), "Enable HTTP payload compression (gzip)." "The default is true."}, + {FLB_CONFIG_MAP_BOOL, "streaming_ingestion_enabled", "false", 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, streaming_ingestion_enabled), + "Enable streaming ingestion. When enabled, data is sent directly to Kusto engine without using blob storage and queues. " + "Note: Streaming ingestion has a 4MB limit per request and doesn't support buffering." + "The default is false (uses queued ingestion)."}, {FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE, offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval), "Set the azure kusto ingestion resources refresh interval" diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index 362b1379533..6b53eb23c4f 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -108,6 +108,9 @@ struct flb_azure_kusto { /* compress payload */ int compression_enabled; + /* streaming ingestion mode */ + int streaming_ingestion_enabled; + int ingestion_resources_refresh_interval; /* records configuration */ @@ -167,6 +170,9 @@ struct flb_azure_kusto { /* Upstream connection to the backend server */ struct flb_upstream *u; + /* Upstream connection to the main Kusto cluster for streaming ingestion */ + struct flb_upstream *u_cluster; + struct flb_upstream *imds_upstream; /* Fluent Bit context */ @@ -179,4 +185,4 @@ struct flb_azure_kusto { flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx); flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl); -#endif \ No newline at end of file +#endif diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index fa899eab686..ac471b7c3ec 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -796,6 +796,24 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * return NULL; } + /* Validate mutual exclusivity between buffering and streaming ingestion */ + if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) { + flb_plg_error(ctx->ins, "buffering_enabled and streaming_ingestion_enabled cannot both be true. When buffering is enabled, streaming ingestion is automatically disabled"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + + /* Log ingestion mode selection */ + if (ctx->streaming_ingestion_enabled) { + flb_plg_info(ctx->ins, "streaming ingestion mode enabled - data will be sent directly to Kusto engine (4MB payload limit per request, no local buffering support)"); + } else { + if (ctx->buffering_enabled) { + flb_plg_info(ctx->ins, "queued ingestion mode enabled with local file buffering - data will be sent via blob storage and ingestion queues"); + } else { + flb_plg_info(ctx->ins, "queued ingestion mode enabled - data will be sent via blob storage and ingestion queues"); + } + } + /* Create oauth2 context */ if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM || ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER) { diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 38d2aa076e5..03a1421ec44 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -517,6 +517,147 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t return ret; } +int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, + size_t tag_len, flb_sds_t payload, size_t payload_size) +{ + int ret = -1; + struct flb_connection *u_conn; + struct flb_http_client *c; + flb_sds_t uri = NULL; + flb_sds_t token = NULL; + size_t resp_size; + time_t now; + struct tm tm; + char tmp[64]; + int len; + + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Starting for tag: %.*s, payload: %zu bytes, db: %s, table: %s, compression: %s", + (int)tag_len, tag, payload_size, ctx->database_name, ctx->table_name, ctx->compression_enabled ? "enabled" : "disabled"); + + now = time(NULL); + gmtime_r(&now, &tm); + len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm); + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Request timestamp: %s", tmp); + + /* Get upstream connection to the main Kusto cluster endpoint (for streaming ingestion) */ + if (!ctx->u_cluster) { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: cluster upstream not available - streaming ingestion requires cluster endpoint"); + return -1; + } + + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Getting upstream connection to cluster endpoint"); + u_conn = flb_upstream_conn_get(ctx->u_cluster); + if (!u_conn) { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to get cluster upstream connection"); + return -1; + } + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Successfully obtained upstream connection"); + + /* Get authentication token */ + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Retrieving OAuth2 authentication token"); + token = get_azure_kusto_token(ctx); + if (!token) { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to retrieve OAuth2 token"); + flb_upstream_conn_release(u_conn); + return -1; + } + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Successfully obtained OAuth2 token (length: %zu)", flb_sds_len(token)); + + /* Build the streaming ingestion URI */ + uri = flb_sds_create_size(256); + if (!uri) { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to create URI buffer"); + flb_sds_destroy(token); + flb_upstream_conn_release(u_conn); + return -1; + } + + /* Create the streaming ingestion URI */ + if (ctx->ingestion_mapping_reference) { + flb_sds_snprintf(&uri, flb_sds_alloc(uri), + "/v1/rest/ingest/%s/%s?streamFormat=MultiJSON&mappingName=%s", + ctx->database_name, ctx->table_name, ctx->ingestion_mapping_reference); + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Using ingestion mapping: %s", ctx->ingestion_mapping_reference); + } else { + flb_sds_snprintf(&uri, flb_sds_alloc(uri), + "/v1/rest/ingest/%s/%s?streamFormat=MultiJSON", + ctx->database_name, ctx->table_name); + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] No ingestion mapping specified"); + } + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Request URI: %s", uri); + + /* Create HTTP client for streaming ingestion */ + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Creating HTTP client for POST request"); + c = flb_http_client(u_conn, FLB_HTTP_POST, uri, payload, payload_size, + NULL, 0, NULL, 0); + + if (c) { + /* Add required headers for streaming ingestion */ + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + flb_http_add_header(c, "Accept", 6, "application/json", 16); + flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); + flb_http_add_header(c, "x-ms-date", 9, tmp, len); + flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); + flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); + flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); + + /* Set Content-Type based on whether compression is enabled */ + if (ctx->compression_enabled) { + flb_http_add_header(c, "Content-Type", 12, "application/json", 16); + flb_http_add_header(c, "Content-Encoding", 16, "gzip", 4); + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Headers set for compressed payload"); + } else { + flb_http_add_header(c, "Content-Type", 12, "application/json; charset=utf-8", 31); + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Headers set for uncompressed payload"); + } + + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Payload sample (first 200 chars): %.200s", (char*)payload); + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Sending HTTP POST request to Kusto engine"); + + /* Send the HTTP request */ + ret = flb_http_do(c, &resp_size); + + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] HTTP request completed - http_do result: %d, HTTP Status: %i, Response size: %zu", ret, c->resp.status, resp_size); + + if (ret == 0) { + /* Check for successful HTTP status codes */ + if (c->resp.status == 200 || c->resp.status == 204) { + ret = 0; + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] SUCCESS: Data successfully ingested to Kusto (HTTP %d)", c->resp.status); + } else { + ret = -1; + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed with status: %i", c->resp.status); + + if (c->resp.payload_size > 0) { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] Error response body (size: %zu): %.*s", + c->resp.payload_size, (int)c->resp.payload_size, c->resp.payload); + } + } + } else { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed at transport level (ret=%d)", ret); + } + + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Destroying HTTP client"); + flb_http_client_destroy(c); + } else { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to create HTTP client context"); + } + + /* Cleanup */ + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Cleaning up resources"); + if (uri) { + flb_sds_destroy(uri); + } + if (token) { + flb_sds_destroy(token); + } + flb_upstream_conn_release(u_conn); + + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Streaming ingestion completed with result: %d", ret); + return ret; +} + + /* Function to generate a random alphanumeric string */ void generate_random_string(char *str, size_t length) { @@ -658,4 +799,4 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, } return ret; -} \ No newline at end of file +} diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.h b/plugins/out_azure_kusto/azure_kusto_ingest.h index b60796f4fd1..80a95105937 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.h +++ b/plugins/out_azure_kusto/azure_kusto_ingest.h @@ -26,4 +26,7 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, size_t tag_len, flb_sds_t payload, size_t payload_size, struct azure_kusto_file *upload_file); -#endif \ No newline at end of file +int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, + size_t tag_len, flb_sds_t payload, size_t payload_size); + +#endif