diff --git a/SPECS/fluent-bit/CVE-2024-26455.patch b/SPECS/fluent-bit/CVE-2024-26455.patch deleted file mode 100644 index 57d7c11254f..00000000000 --- a/SPECS/fluent-bit/CVE-2024-26455.patch +++ /dev/null @@ -1,60 +0,0 @@ -diff --git a/plugins/custom_calyptia/calyptia.c b/plugins/custom_calyptia/calyptia.c -index 5639af427..4aba53ca7 100644 ---- a/plugins/custom_calyptia/calyptia.c -+++ b/plugins/custom_calyptia/calyptia.c -@@ -245,7 +245,6 @@ static struct flb_output_instance *setup_cloud_output(struct flb_config *config, - - if (!cloud) { - flb_plg_error(ctx->ins, "could not load Calyptia Cloud connector"); -- flb_free(ctx); - return NULL; - } - -@@ -254,7 +253,6 @@ static struct flb_output_instance *setup_cloud_output(struct flb_config *config, - - if (ret != 0) { - flb_plg_error(ctx->ins, "could not load Calyptia Cloud connector"); -- flb_free(ctx); - return NULL; - } - -@@ -268,7 +266,6 @@ static struct flb_output_instance *setup_cloud_output(struct flb_config *config, - label = flb_sds_create_size(strlen(key->str) + strlen(val->str) + 1); - - if (!label) { -- flb_free(ctx); - return NULL; - } - -@@ -316,7 +313,6 @@ static struct flb_output_instance *setup_cloud_output(struct flb_config *config, - label = flb_sds_create_size(strlen("fleet_id") + strlen(ctx->fleet_id) + 1); - - if (!label) { -- flb_free(ctx); - return NULL; - } - -@@ -424,6 +420,7 @@ static int cb_calyptia_init(struct flb_custom_instance *ins, - - if (ctx->machine_id == NULL) { - flb_plg_error(ctx->ins, "unable to retrieve machine_id"); -+ flb_free(ctx); - return -1; - } - -@@ -435,6 +432,7 @@ static int cb_calyptia_init(struct flb_custom_instance *ins, - - if (!ctx->i) { - flb_plg_error(ctx->ins, "could not load metrics collector"); -+ flb_free(ctx); - return -1; - } - -@@ -455,6 +453,7 @@ static int cb_calyptia_init(struct flb_custom_instance *ins, - ctx->o = setup_cloud_output(config, ctx); - - if (ctx->o == NULL) { -+ flb_free(ctx); - return -1; - } - } diff --git a/SPECS/fluent-bit/CVE-2024-28182.patch b/SPECS/fluent-bit/CVE-2024-28182.patch new file mode 100644 index 00000000000..e75a5551b68 --- /dev/null +++ b/SPECS/fluent-bit/CVE-2024-28182.patch @@ -0,0 +1,91 @@ +diff --git a/lib/nghttp2/lib/includes/nghttp2/nghttp2.h b/lib/nghttp2/lib/includes/nghttp2/nghttp2.h +index 66ea3c63c..5378daf43 100644 +--- a/lib/nghttp2/lib/includes/nghttp2/nghttp2.h ++++ b/lib/nghttp2/lib/includes/nghttp2/nghttp2.h +@@ -440,7 +440,12 @@ typedef enum { + * exhaustion on server side to send these frames forever and does + * not read network. + */ +- NGHTTP2_ERR_FLOODED = -904 ++ NGHTTP2_ERR_FLOODED = -904, ++ /** ++ * When a local endpoint receives too many CONTINUATION frames ++ * following a HEADER frame. ++ */ ++ NGHTTP2_ERR_TOO_MANY_CONTINUATIONS = -905, + } nghttp2_error; + + /** +diff --git a/lib/nghttp2/lib/nghttp2_helper.c b/lib/nghttp2/lib/nghttp2_helper.c +index 93dd4754b..b3563d98e 100644 +--- a/lib/nghttp2/lib/nghttp2_helper.c ++++ b/lib/nghttp2/lib/nghttp2_helper.c +@@ -336,6 +336,8 @@ const char *nghttp2_strerror(int error_code) { + "closed"; + case NGHTTP2_ERR_TOO_MANY_SETTINGS: + return "SETTINGS frame contained more than the maximum allowed entries"; ++ case NGHTTP2_ERR_TOO_MANY_CONTINUATIONS: ++ return "Too many CONTINUATION frames following a HEADER frame"; + default: + return "Unknown error code"; + } +diff --git a/lib/nghttp2/lib/nghttp2_session.c b/lib/nghttp2/lib/nghttp2_session.c +index c0d86026a..51ed4494e 100644 +--- a/lib/nghttp2/lib/nghttp2_session.c ++++ b/lib/nghttp2/lib/nghttp2_session.c +@@ -496,6 +496,7 @@ static int session_new(nghttp2_session **session_ptr, + (*session_ptr)->max_send_header_block_length = NGHTTP2_MAX_HEADERSLEN; + (*session_ptr)->max_outbound_ack = NGHTTP2_DEFAULT_MAX_OBQ_FLOOD_ITEM; + (*session_ptr)->max_settings = NGHTTP2_DEFAULT_MAX_SETTINGS; ++ (*session_ptr)->max_continuations = NGHTTP2_DEFAULT_MAX_CONTINUATIONS; + + if (option) { + if ((option->opt_set_mask & NGHTTP2_OPT_NO_AUTO_WINDOW_UPDATE) && +@@ -6778,6 +6779,8 @@ ssize_t nghttp2_session_mem_recv(nghttp2_session *session, const uint8_t *in, + } + } + session_inbound_frame_reset(session); ++ ++ session->num_continuations = 0; + } + break; + } +@@ -6899,6 +6902,10 @@ ssize_t nghttp2_session_mem_recv(nghttp2_session *session, const uint8_t *in, + } + #endif /* DEBUGBUILD */ + ++ if (++session->num_continuations > session->max_continuations) { ++ return NGHTTP2_ERR_TOO_MANY_CONTINUATIONS; ++ } ++ + readlen = inbound_frame_buf_read(iframe, in, last); + in += readlen; + +diff --git a/lib/nghttp2/lib/nghttp2_session.h b/lib/nghttp2/lib/nghttp2_session.h +index b119329a0..ef8f7b27d 100644 +--- a/lib/nghttp2/lib/nghttp2_session.h ++++ b/lib/nghttp2/lib/nghttp2_session.h +@@ -110,6 +110,10 @@ typedef struct { + #define NGHTTP2_DEFAULT_STREAM_RESET_BURST 1000 + #define NGHTTP2_DEFAULT_STREAM_RESET_RATE 33 + ++/* The default max number of CONTINUATION frames following an incoming ++ HEADER frame. */ ++#define NGHTTP2_DEFAULT_MAX_CONTINUATIONS 8 ++ + /* Internal state when receiving incoming frame */ + typedef enum { + /* Receiving frame header */ +@@ -290,6 +294,12 @@ struct nghttp2_session { + size_t max_send_header_block_length; + /* The maximum number of settings accepted per SETTINGS frame. */ + size_t max_settings; ++ /* The maximum number of CONTINUATION frames following an incoming ++ HEADER frame. */ ++ size_t max_continuations; ++ /* The number of CONTINUATION frames following an incoming HEADER ++ frame. This variable is reset when END_HEADERS flag is seen. */ ++ size_t num_continuations; + /* Next Stream ID. Made unsigned int to detect >= (1 << 31). */ + uint32_t next_stream_id; + /* The last stream ID this session initiated. For client session, diff --git a/SPECS/fluent-bit/fix_issue_8025.patch b/SPECS/fluent-bit/fix_issue_8025.patch deleted file mode 100644 index d5d97590822..00000000000 --- a/SPECS/fluent-bit/fix_issue_8025.patch +++ /dev/null @@ -1,779 +0,0 @@ -From c60999c186c23cff79dad4dd31c838404ace228e Mon Sep 17 00:00:00 2001 -From: "jinyong.choi" -Date: Wed, 18 Oct 2023 23:58:38 +0900 -Subject: [PATCH 1/2] in_tail: Delete unmanaged inodes from db during startup - (#8025) (1/2) - -To prevent incorrect inode references, -FluentBit automatically removes unmanaged inodes during startup. - -Signed-off-by: jinyong.choi ---- - plugins/in_tail/tail.c | 9 ++ - plugins/in_tail/tail_db.c | 161 +++++++++++++++++++++++++++++++ - plugins/in_tail/tail_db.h | 3 + - plugins/in_tail/tail_sql.h | 22 +++++ - tests/runtime/in_tail.c | 189 +++++++++++++++++++++++++++++++++++++ - 5 files changed, 384 insertions(+) - -diff --git a/plugins/in_tail/tail.c b/plugins/in_tail/tail.c -index 34a0fec3dbd..37b1f4f6c68 100644 ---- a/plugins/in_tail/tail.c -+++ b/plugins/in_tail/tail.c -@@ -372,6 +372,15 @@ static int in_tail_init(struct flb_input_instance *in, - /* Scan path */ - flb_tail_scan(ctx->path_list, ctx); - -+#ifdef FLB_HAVE_SQLDB -+ /* Delete stale files that are not monitored from the database */ -+ ret = flb_tail_db_stale_file_delete(in, config, ctx); -+ if (ret == -1) { -+ flb_tail_config_destroy(ctx); -+ return -1; -+ } -+#endif -+ - /* - * After the first scan (on start time), all new files discovered needs to be - * read from head, so we switch the 'read_from_head' flag to true so any -diff --git a/plugins/in_tail/tail_db.c b/plugins/in_tail/tail_db.c -index 664963b6dba..99242f8a15b 100644 ---- a/plugins/in_tail/tail_db.c -+++ b/plugins/in_tail/tail_db.c -@@ -168,6 +168,42 @@ static int db_file_insert(struct flb_tail_file *file, struct flb_tail_config *ct - return flb_sqldb_last_id(ctx->db); - } - -+static int stmt_add_param_concat(struct flb_tail_config *ctx, -+ flb_sds_t *stmt_sql, uint64_t count) -+{ -+ uint64_t idx; -+ flb_sds_t sds_tmp; -+ -+ sds_tmp = flb_sds_cat(*stmt_sql, SQL_STMT_START_PARAM, -+ SQL_STMT_START_PARAM_LEN); -+ if (sds_tmp == NULL) { -+ flb_plg_debug(ctx->ins, "error concatenating stmt_sql: param start"); -+ return -1; -+ } -+ *stmt_sql = sds_tmp; -+ -+ for (idx = 1; idx < count; idx++) { -+ sds_tmp = flb_sds_cat(*stmt_sql, SQL_STMT_ADD_PARAM, -+ SQL_STMT_ADD_PARAM_LEN); -+ if (sds_tmp == NULL) { -+ flb_plg_debug(ctx->ins, "error concatenating stmt_sql: add param"); -+ return -1; -+ } -+ -+ *stmt_sql = sds_tmp; -+ } -+ -+ sds_tmp = flb_sds_cat(*stmt_sql, SQL_STMT_PARAM_END, -+ SQL_STMT_PARAM_END_LEN); -+ if (sds_tmp == NULL) { -+ flb_plg_debug(ctx->ins, "error concatenating stmt_sql: param end"); -+ return -1; -+ } -+ *stmt_sql = sds_tmp; -+ -+ return 0; -+} -+ - int flb_tail_db_file_set(struct flb_tail_file *file, - struct flb_tail_config *ctx) - { -@@ -275,3 +311,128 @@ int flb_tail_db_file_delete(struct flb_tail_file *file, - flb_plg_debug(ctx->ins, "db: file deleted from database: %s", file->name); - return 0; - } -+ -+/* -+ * Delete stale file from database -+ */ -+int flb_tail_db_stale_file_delete(struct flb_input_instance *ins, -+ struct flb_config *config, -+ struct flb_tail_config *ctx) -+{ -+ int ret = -1; -+ size_t sql_size; -+ uint64_t idx; -+ uint64_t file_count = ctx->files_static_count; -+ flb_sds_t stale_delete_sql; -+ flb_sds_t sds_tmp; -+ sqlite3_stmt *stmt_delete_inodes = NULL; -+ struct mk_list *tmp; -+ struct mk_list *head; -+ struct flb_tail_file *file; -+ -+ if (!ctx->db) { -+ return 0; -+ } -+ -+ /* Create a stmt sql buffer */ -+ sql_size = SQL_DELETE_STALE_FILE_START_LEN; -+ sql_size += SQL_DELETE_STALE_FILE_WHERE_LEN; -+ sql_size += SQL_STMT_START_PARAM_LEN; -+ sql_size += SQL_STMT_PARAM_END_LEN; -+ sql_size += SQL_STMT_END_LEN; -+ if (file_count > 0) { -+ sql_size += (SQL_STMT_ADD_PARAM_LEN * file_count); -+ } -+ -+ stale_delete_sql = flb_sds_create_size(sql_size + 1); -+ if (!stale_delete_sql) { -+ flb_plg_error(ctx->ins, "cannot allocate buffer for stale_delete_sql:" -+ " size: %zu", sql_size); -+ return -1; -+ } -+ -+ /* Create a stmt sql */ -+ sds_tmp = flb_sds_cat(stale_delete_sql, SQL_DELETE_STALE_FILE_START, -+ SQL_DELETE_STALE_FILE_START_LEN); -+ if (sds_tmp == NULL) { -+ flb_plg_error(ctx->ins, -+ "error concatenating stale_delete_sql: start"); -+ flb_sds_destroy(stale_delete_sql); -+ return -1; -+ } -+ stale_delete_sql = sds_tmp; -+ -+ if (file_count > 0) { -+ sds_tmp = flb_sds_cat(stale_delete_sql, SQL_DELETE_STALE_FILE_WHERE, -+ SQL_DELETE_STALE_FILE_WHERE_LEN); -+ if (sds_tmp == NULL) { -+ flb_plg_error(ctx->ins, -+ "error concatenating stale_delete_sql: where"); -+ flb_sds_destroy(stale_delete_sql); -+ return -1; -+ } -+ stale_delete_sql = sds_tmp; -+ -+ ret = stmt_add_param_concat(ctx, &stale_delete_sql, file_count); -+ if (ret == -1) { -+ flb_plg_error(ctx->ins, -+ "error concatenating stale_delete_sql: param"); -+ flb_sds_destroy(stale_delete_sql); -+ return -1; -+ } -+ } -+ -+ sds_tmp = flb_sds_cat(stale_delete_sql, SQL_STMT_END, SQL_STMT_END_LEN); -+ if (sds_tmp == NULL) { -+ flb_plg_error(ctx->ins, -+ "error concatenating stale_delete_sql: end"); -+ flb_sds_destroy(stale_delete_sql); -+ return -1; -+ } -+ stale_delete_sql = sds_tmp; -+ -+ /* Prepare stmt */ -+ ret = sqlite3_prepare_v2(ctx->db->handler, stale_delete_sql, -1, -+ &stmt_delete_inodes, 0); -+ if (ret != SQLITE_OK) { -+ flb_plg_error(ctx->ins, "error preparing database SQL statement:" -+ " stmt_delete_inodes sql:%s, ret=%d", stale_delete_sql, -+ ret); -+ flb_sds_destroy(stale_delete_sql); -+ return -1; -+ } -+ -+ /* Bind parameters */ -+ idx = 1; -+ mk_list_foreach_safe(head, tmp, &ctx->files_static) { -+ file = mk_list_entry(head, struct flb_tail_file, _head); -+ ret = sqlite3_bind_int64(stmt_delete_inodes, idx, file->inode); -+ if (ret != SQLITE_OK) { -+ flb_plg_error(ctx->ins, "error binding to stmt_delete_inodes:" -+ " inode=%lu, ret=%d", file->inode, ret); -+ sqlite3_finalize(stmt_delete_inodes); -+ flb_sds_destroy(stale_delete_sql); -+ return -1; -+ } -+ idx++; -+ } -+ -+ /* Run the delete inodes */ -+ ret = sqlite3_step(stmt_delete_inodes); -+ if (ret != SQLITE_DONE) { -+ sqlite3_finalize(stmt_delete_inodes); -+ flb_sds_destroy(stale_delete_sql); -+ flb_plg_error(ctx->ins, "cannot execute delete stale inodes: ret=%d", -+ ret); -+ return -1; -+ } -+ -+ ret = sqlite3_changes(ctx->db->handler); -+ flb_plg_info(ctx->ins, "db: delete unmonitored stale inodes from the" -+ " database: count=%d", ret); -+ -+ sqlite3_finalize(stmt_delete_inodes); -+ flb_sds_destroy(stale_delete_sql); -+ -+ return 0; -+} -diff --git a/plugins/in_tail/tail_db.h b/plugins/in_tail/tail_db.h -index 7b5355d229c..b1fde721d29 100644 ---- a/plugins/in_tail/tail_db.h -+++ b/plugins/in_tail/tail_db.h -@@ -40,4 +40,7 @@ int flb_tail_db_file_rotate(const char *new_name, - struct flb_tail_config *ctx); - int flb_tail_db_file_delete(struct flb_tail_file *file, - struct flb_tail_config *ctx); -+int flb_tail_db_stale_file_delete(struct flb_input_instance *ins, -+ struct flb_config *config, -+ struct flb_tail_config *ctx); - #endif -diff --git a/plugins/in_tail/tail_sql.h b/plugins/in_tail/tail_sql.h -index 855933a0149..bf724f318cd 100644 ---- a/plugins/in_tail/tail_sql.h -+++ b/plugins/in_tail/tail_sql.h -@@ -53,6 +53,28 @@ - #define SQL_DELETE_FILE \ - "DELETE FROM in_tail_files WHERE id=@id;" - -+#define SQL_STMT_START_PARAM "(?" -+#define SQL_STMT_START_PARAM_LEN (sizeof(SQL_STMT_START_PARAM) - 1) -+ -+#define SQL_STMT_ADD_PARAM ",?" -+#define SQL_STMT_ADD_PARAM_LEN (sizeof(SQL_STMT_ADD_PARAM) - 1) -+ -+#define SQL_STMT_PARAM_END ")" -+#define SQL_STMT_PARAM_END_LEN (sizeof(SQL_STMT_PARAM_END) - 1) -+ -+#define SQL_STMT_END ";" -+#define SQL_STMT_END_LEN (sizeof(SQL_STMT_END) - 1) -+ -+#define SQL_DELETE_STALE_FILE_START \ -+ "DELETE FROM in_tail_files " -+#define SQL_DELETE_STALE_FILE_START_LEN \ -+ (sizeof(SQL_DELETE_STALE_FILE_START) - 1) -+ -+#define SQL_DELETE_STALE_FILE_WHERE \ -+ "WHERE inode NOT IN " -+#define SQL_DELETE_STALE_FILE_WHERE_LEN \ -+ (sizeof(SQL_DELETE_STALE_FILE_WHERE) - 1) -+ - #define SQL_PRAGMA_SYNC \ - "PRAGMA synchronous=%i;" - -diff --git a/tests/runtime/in_tail.c b/tests/runtime/in_tail.c -index ee5fba88744..74accb66ed6 100644 ---- a/tests/runtime/in_tail.c -+++ b/tests/runtime/in_tail.c -@@ -1545,6 +1545,194 @@ void flb_test_db() - test_tail_ctx_destroy(ctx); - unlink(db); - } -+ -+void flb_test_db_delete_stale_file() -+{ -+ struct flb_lib_out_cb cb_data; -+ struct test_tail_ctx *ctx; -+ char *org_file[] = {"test_db.log", "test_db_stale.log"}; -+ char *tmp_file[] = {"test_db.log"}; -+ char *path = "test_db.log, test_db_stale.log"; -+ char *move_file[] = {"test_db_stale.log", "test_db_stale_new.log"}; -+ char *new_file[] = {"test_db.log", "test_db_stale_new.log"}; -+ char *new_path = "test_db.log, test_db_stale_new.log"; -+ char *db = "test_db.db"; -+ char *msg_init = "hello world"; -+ char *msg_end = "hello db end"; -+ int i; -+ int ret; -+ int num; -+ int unused; -+ -+ unlink(db); -+ -+ clear_output_num(); -+ -+ cb_data.cb = cb_count_msgpack; -+ cb_data.data = &unused; -+ -+ ctx = test_tail_ctx_create(&cb_data, -+ &org_file[0], -+ sizeof(org_file)/sizeof(char *), -+ FLB_FALSE); -+ if (!TEST_CHECK(ctx != NULL)) { -+ TEST_MSG("test_ctx_create failed"); -+ exit(EXIT_FAILURE); -+ } -+ -+ ret = flb_input_set(ctx->flb, ctx->o_ffd, -+ "path", path, -+ "read_from_head", "true", -+ "db", db, -+ "db.sync", "full", -+ NULL); -+ TEST_CHECK(ret == 0); -+ -+ ret = flb_output_set(ctx->flb, ctx->o_ffd, -+ NULL); -+ TEST_CHECK(ret == 0); -+ -+ /* Start the engine */ -+ ret = flb_start(ctx->flb); -+ TEST_CHECK(ret == 0); -+ -+ ret = write_msg(ctx, msg_init, strlen(msg_init)); -+ if (!TEST_CHECK(ret > 0)) { -+ test_tail_ctx_destroy(ctx); -+ unlink(db); -+ exit(EXIT_FAILURE); -+ } -+ -+ /* waiting to flush */ -+ flb_time_msleep(500); -+ -+ num = get_output_num(); -+ if (!TEST_CHECK(num > 0)) { -+ TEST_MSG("no output"); -+ } -+ -+ if (ctx->fds != NULL) { -+ for (i=0; ifd_num; i++) { -+ close(ctx->fds[i]); -+ } -+ flb_free(ctx->fds); -+ } -+ flb_stop(ctx->flb); -+ flb_destroy(ctx->flb); -+ flb_free(ctx); -+ -+ /* re-init to use db */ -+ clear_output_num(); -+ -+ /* -+ * Changing the file name from 'test_db_stale.log' to -+ * 'test_db_stale_new.log.' In this scenario, it is assumed that the -+ * file was deleted after the FluentBit was terminated. However, since -+ * the FluentBit was shutdown, the inode remains in the database. -+ * The reason for renaming is to preserve the existing file for later use. -+ */ -+ ret = rename(move_file[0], move_file[1]); -+ TEST_CHECK(ret == 0); -+ -+ cb_data.cb = cb_count_msgpack; -+ cb_data.data = &unused; -+ -+ ctx = test_tail_ctx_create(&cb_data, -+ &tmp_file[0], -+ sizeof(tmp_file)/sizeof(char *), -+ FLB_FALSE); -+ if (!TEST_CHECK(ctx != NULL)) { -+ TEST_MSG("test_ctx_create failed"); -+ unlink(db); -+ exit(EXIT_FAILURE); -+ } -+ -+ ret = flb_input_set(ctx->flb, ctx->o_ffd, -+ "path", path, -+ "read_from_head", "true", -+ "db", db, -+ "db.sync", "full", -+ NULL); -+ TEST_CHECK(ret == 0); -+ -+ /* -+ * Start the engine -+ * FluentBit will delete stale inodes. -+ */ -+ ret = flb_start(ctx->flb); -+ TEST_CHECK(ret == 0); -+ -+ /* waiting to flush */ -+ flb_time_msleep(500); -+ -+ if (ctx->fds != NULL) { -+ for (i=0; ifd_num; i++) { -+ close(ctx->fds[i]); -+ } -+ flb_free(ctx->fds); -+ } -+ flb_stop(ctx->flb); -+ flb_destroy(ctx->flb); -+ flb_free(ctx); -+ -+ /* re-init to use db */ -+ clear_output_num(); -+ -+ cb_data.cb = cb_count_msgpack; -+ cb_data.data = &unused; -+ -+ ctx = test_tail_ctx_create(&cb_data, -+ &new_file[0], -+ sizeof(new_file)/sizeof(char *), -+ FLB_FALSE); -+ if (!TEST_CHECK(ctx != NULL)) { -+ TEST_MSG("test_ctx_create failed"); -+ unlink(db); -+ exit(EXIT_FAILURE); -+ } -+ -+ ret = flb_input_set(ctx->flb, ctx->o_ffd, -+ "path", new_path, -+ "read_from_head", "true", -+ "db", db, -+ "db.sync", "full", -+ NULL); -+ TEST_CHECK(ret == 0); -+ -+ /* -+ * Start the engine -+ * 'test_db_stale_new.log.' is a new file. -+ * The inode of 'test_db_stale.log' was deleted previously. -+ * So, it reads from the beginning of the file. -+ */ -+ ret = flb_start(ctx->flb); -+ TEST_CHECK(ret == 0); -+ -+ /* waiting to flush */ -+ flb_time_msleep(500); -+ -+ ret = write_msg(ctx, msg_end, strlen(msg_end)); -+ if (!TEST_CHECK(ret > 0)) { -+ test_tail_ctx_destroy(ctx); -+ unlink(db); -+ exit(EXIT_FAILURE); -+ } -+ -+ /* waiting to flush */ -+ flb_time_msleep(500); -+ -+ num = get_output_num(); -+ if (!TEST_CHECK(num == 3)) { -+ /* 3 = -+ * test_db.log : "hello db end" -+ * test_db_stale.log : "msg_init" + "hello db end" -+ */ -+ TEST_MSG("num error. expect=3 got=%d", num); -+ } -+ -+ test_tail_ctx_destroy(ctx); -+ unlink(db); -+} - #endif /* FLB_HAVE_SQLDB */ - - /* Test list */ -@@ -1569,6 +1757,7 @@ TEST_LIST = { - - #ifdef FLB_HAVE_SQLDB - {"db", flb_test_db}, -+ {"db_delete_stale_file", flb_test_db_delete_stale_file}, - #endif - - #ifdef in_tail - -From d06114cbb1419ef9e8969b897730de07b64cfe28 Mon Sep 17 00:00:00 2001 -From: "jinyong.choi" -Date: Thu, 19 Oct 2023 00:37:36 +0900 -Subject: [PATCH 2/2] in_tail: Introducing the compare_filename option to - db_file_exists (#8025)(2/2) - -When checking the existence of a file's inode, if the 'compare_filename' -option is enabled, it is modified to compare the filename as well. -If the inode matches but the filename is different, it removes the stale -inode from the database. - -Signed-off-by: jinyong.choi ---- - plugins/in_tail/tail.c | 8 ++ - plugins/in_tail/tail_config.h | 1 + - plugins/in_tail/tail_db.c | 58 ++++++++++++- - tests/runtime/in_tail.c | 148 ++++++++++++++++++++++++++++++++++ - 4 files changed, 213 insertions(+), 2 deletions(-) - -diff --git a/plugins/in_tail/tail.c b/plugins/in_tail/tail.c -index 37b1f4f6c68..52bf2ed6d40 100644 ---- a/plugins/in_tail/tail.c -+++ b/plugins/in_tail/tail.c -@@ -734,6 +734,14 @@ static struct flb_config_map config_map[] = { - "provides higher performance. Note that WAL is not compatible with " - "shared network file systems." - }, -+ { -+ FLB_CONFIG_MAP_BOOL, "db.compare_filename", "false", -+ 0, FLB_TRUE, offsetof(struct flb_tail_config, compare_filename), -+ "This option determines whether to check both the inode and the filename " -+ "when retrieving file information from the db." -+ "'true' verifies both the inode and filename, while 'false' checks only " -+ "the inode (default)." -+ }, - #endif - - /* Multiline Options */ -diff --git a/plugins/in_tail/tail_config.h b/plugins/in_tail/tail_config.h -index dcfa54e0264..c0263b46503 100644 ---- a/plugins/in_tail/tail_config.h -+++ b/plugins/in_tail/tail_config.h -@@ -107,6 +107,7 @@ struct flb_tail_config { - struct flb_sqldb *db; - int db_sync; - int db_locking; -+ int compare_filename; - flb_sds_t db_journal_mode; - sqlite3_stmt *stmt_get_file; - sqlite3_stmt *stmt_insert_file; -diff --git a/plugins/in_tail/tail_db.c b/plugins/in_tail/tail_db.c -index 99242f8a15b..6f535ea646b 100644 ---- a/plugins/in_tail/tail_db.c -+++ b/plugins/in_tail/tail_db.c -@@ -95,9 +95,38 @@ int flb_tail_db_close(struct flb_sqldb *db) - return 0; - } - -+static int flb_tail_db_file_delete_by_id(struct flb_tail_config *ctx, -+ uint64_t id) -+{ -+ int ret; -+ -+ /* Bind parameters */ -+ ret = sqlite3_bind_int64(ctx->stmt_delete_file, 1, id); -+ if (ret != SQLITE_OK) { -+ flb_plg_error(ctx->ins, "db: error binding id=%"PRIu64", ret=%d", id, ret); -+ return -1; -+ } -+ -+ ret = sqlite3_step(ctx->stmt_delete_file); -+ -+ sqlite3_clear_bindings(ctx->stmt_delete_file); -+ sqlite3_reset(ctx->stmt_delete_file); -+ -+ if (ret != SQLITE_DONE) { -+ flb_plg_error(ctx->ins, "db: error deleting stale entry from database:" -+ " id=%"PRIu64, id); -+ return -1; -+ } -+ -+ flb_plg_info(ctx->ins, "db: stale file deleted from database:" -+ " id=%"PRIu64, id); -+ return 0; -+} -+ - /* -- * Check if an file inode exists in the database. Return FLB_TRUE or -- * FLB_FALSE -+ * Check if an file inode exists in the database. -+ * If the 'compare_filename' option is enabled, -+ * it checks along with the filename. Return FLB_TRUE or FLB_FALSE - */ - static int db_file_exists(struct flb_tail_file *file, - struct flb_tail_config *ctx, -@@ -105,6 +134,7 @@ static int db_file_exists(struct flb_tail_file *file, - { - int ret; - int exists = FLB_FALSE; -+ const unsigned char *name; - - /* Bind parameters */ - sqlite3_bind_int64(ctx->stmt_get_file, 1, file->inode); -@@ -116,11 +146,30 @@ static int db_file_exists(struct flb_tail_file *file, - /* id: column 0 */ - *id = sqlite3_column_int64(ctx->stmt_get_file, 0); - -+ /* name: column 1 */ -+ name = sqlite3_column_text(ctx->stmt_get_file, 1); -+ if (ctx->compare_filename && name == NULL) { -+ flb_plg_error(ctx->ins, "db: error getting name: id=%"PRIu64, *id); -+ return -1; -+ } -+ - /* offset: column 2 */ - *offset = sqlite3_column_int64(ctx->stmt_get_file, 2); - - /* inode: column 3 */ - *inode = sqlite3_column_int64(ctx->stmt_get_file, 3); -+ -+ /* Checking if the file's name and inode match exactly */ -+ if (ctx->compare_filename) { -+ if (flb_tail_target_file_name_cmp((char *) name, file) != 0) { -+ exists = FLB_FALSE; -+ flb_plg_debug(ctx->ins, "db: exists stale file from database:" -+ " id=%"PRIu64" inode=%"PRIu64" offset=%"PRIu64 -+ " name=%s file_inode=%"PRIu64" file_name=%s", -+ *id, *inode, *offset, name, file->inode, -+ file->name); -+ } -+ } - } - else if (ret == SQLITE_DONE) { - /* all good */ -@@ -221,6 +270,11 @@ int flb_tail_db_file_set(struct flb_tail_file *file, - } - - if (ret == FLB_FALSE) { -+ /* Delete stale file of same inode */ -+ if (ctx->compare_filename && id > 0) { -+ flb_tail_db_file_delete_by_id(ctx, id); -+ } -+ - /* Get the database ID for this file */ - file->db_id = db_file_insert(file, ctx); - } -diff --git a/tests/runtime/in_tail.c b/tests/runtime/in_tail.c -index 74accb66ed6..90d8832bc79 100644 ---- a/tests/runtime/in_tail.c -+++ b/tests/runtime/in_tail.c -@@ -1733,6 +1733,153 @@ void flb_test_db_delete_stale_file() - test_tail_ctx_destroy(ctx); - unlink(db); - } -+ -+void flb_test_db_compare_filename() -+{ -+ struct flb_lib_out_cb cb_data; -+ struct test_tail_ctx *ctx; -+ char *org_file[] = {"test_db.log"}; -+ char *moved_file[] = {"test_db_moved.log"}; -+ char *db = "test_db.db"; -+ char *msg_init = "hello world"; -+ char *msg_moved = "hello world moved"; -+ char *msg_end = "hello db end"; -+ int i; -+ int ret; -+ int num; -+ int unused; -+ -+ unlink(db); -+ -+ clear_output_num(); -+ -+ cb_data.cb = cb_count_msgpack; -+ cb_data.data = &unused; -+ -+ ctx = test_tail_ctx_create(&cb_data, -+ &org_file[0], -+ sizeof(org_file)/sizeof(char *), -+ FLB_FALSE); -+ if (!TEST_CHECK(ctx != NULL)) { -+ TEST_MSG("test_ctx_create failed"); -+ exit(EXIT_FAILURE); -+ } -+ -+ ret = flb_input_set(ctx->flb, ctx->o_ffd, -+ "path", org_file[0], -+ "read_from_head", "true", -+ "db", db, -+ "db.sync", "full", -+ "db.compare_filename", "true", -+ NULL); -+ TEST_CHECK(ret == 0); -+ -+ ret = flb_output_set(ctx->flb, ctx->o_ffd, -+ NULL); -+ TEST_CHECK(ret == 0); -+ -+ /* Start the engine */ -+ ret = flb_start(ctx->flb); -+ TEST_CHECK(ret == 0); -+ -+ ret = write_msg(ctx, msg_init, strlen(msg_init)); -+ if (!TEST_CHECK(ret > 0)) { -+ test_tail_ctx_destroy(ctx); -+ unlink(db); -+ exit(EXIT_FAILURE); -+ } -+ -+ /* waiting to flush */ -+ flb_time_msleep(500); -+ -+ num = get_output_num(); -+ if (!TEST_CHECK(num > 0)) { -+ TEST_MSG("no output"); -+ } -+ -+ if (ctx->fds != NULL) { -+ for (i=0; ifd_num; i++) { -+ close(ctx->fds[i]); -+ } -+ flb_free(ctx->fds); -+ } -+ flb_stop(ctx->flb); -+ flb_destroy(ctx->flb); -+ flb_free(ctx); -+ -+ /* re-init to use db */ -+ clear_output_num(); -+ -+ /* -+ * Changing the file name from 'test_db.log' to 'test_db_moved.log.' -+ * In this scenario, it is assumed that the FluentBit has been terminated, -+ * and the file has been recreated with the same inode, with offsets equal -+ * to or greater than the previous file. -+ */ -+ ret = rename(org_file[0], moved_file[0]); -+ TEST_CHECK(ret == 0); -+ -+ cb_data.cb = cb_count_msgpack; -+ cb_data.data = &unused; -+ -+ ctx = test_tail_ctx_create(&cb_data, -+ &moved_file[0], -+ sizeof(moved_file)/sizeof(char *), -+ FLB_FALSE); -+ if (!TEST_CHECK(ctx != NULL)) { -+ TEST_MSG("test_ctx_create failed"); -+ unlink(db); -+ exit(EXIT_FAILURE); -+ } -+ -+ ret = flb_input_set(ctx->flb, ctx->o_ffd, -+ "path", moved_file[0], -+ "read_from_head", "true", -+ "db", db, -+ "db.sync", "full", -+ "db.compare_filename", "true", -+ NULL); -+ TEST_CHECK(ret == 0); -+ -+ /* -+ * Start the engine -+ * The file has been newly created, and due to the 'db.compare_filename' -+ * option being set to true, it compares filenames to consider it a new -+ * file even if the inode is the same. If the option is set to false, -+ * it can be assumed to be the same file as before. -+ */ -+ ret = flb_start(ctx->flb); -+ TEST_CHECK(ret == 0); -+ -+ /* waiting to flush */ -+ flb_time_msleep(500); -+ -+ ret = write_msg(ctx, msg_moved, strlen(msg_moved)); -+ if (!TEST_CHECK(ret > 0)) { -+ test_tail_ctx_destroy(ctx); -+ unlink(db); -+ exit(EXIT_FAILURE); -+ } -+ -+ ret = write_msg(ctx, msg_end, strlen(msg_end)); -+ if (!TEST_CHECK(ret > 0)) { -+ test_tail_ctx_destroy(ctx); -+ unlink(db); -+ exit(EXIT_FAILURE); -+ } -+ -+ /* waiting to flush */ -+ flb_time_msleep(500); -+ -+ num = get_output_num(); -+ if (!TEST_CHECK(num == 3)) { -+ /* 3 = msg_init + msg_moved + msg_end */ -+ TEST_MSG("num error. expect=3 got=%d", num); -+ } -+ -+ test_tail_ctx_destroy(ctx); -+ unlink(db); -+} - #endif /* FLB_HAVE_SQLDB */ - - /* Test list */ -@@ -1758,6 +1905,7 @@ TEST_LIST = { - #ifdef FLB_HAVE_SQLDB - {"db", flb_test_db}, - {"db_delete_stale_file", flb_test_db_delete_stale_file}, -+ {"db_compare_filename", flb_test_db_compare_filename}, - #endif - - #ifdef in_tail diff --git a/SPECS/fluent-bit/fluent-bit.signatures.json b/SPECS/fluent-bit/fluent-bit.signatures.json index bf91d54c81c..bd04e715199 100644 --- a/SPECS/fluent-bit/fluent-bit.signatures.json +++ b/SPECS/fluent-bit/fluent-bit.signatures.json @@ -1,5 +1,5 @@ { - "Signatures": { - "fluent-bit-2.2.3.tar.gz": "006ed94d34e4036fb7fb5a02016ccf3a55d7f5ccdefd5df756d1ba2206cfc55d" - } -} + "Signatures": { + "fluent-bit-3.0.6.tar.gz": "2cad0ac1e04646bc084b7bb3d5552589fa1997eaa5ba3fe2137a65ecf101cd9f" + } +} \ No newline at end of file diff --git a/SPECS/fluent-bit/fluent-bit.spec b/SPECS/fluent-bit/fluent-bit.spec index cb655377a7a..4fa8679d97a 100644 --- a/SPECS/fluent-bit/fluent-bit.spec +++ b/SPECS/fluent-bit/fluent-bit.spec @@ -1,19 +1,17 @@ Summary: Fast and Lightweight Log processor and forwarder for Linux, BSD and OSX Name: fluent-bit -Version: 2.2.3 -Release: 6%{?dist} +Version: 3.0.6 +Release: 1%{?dist} License: Apache-2.0 Vendor: Microsoft Corporation Distribution: Mariner URL: https://fluentbit.io Source0: https://github.com/fluent/%{name}/archive/refs/tags/v%{version}.tar.gz#/%{name}-%{version}.tar.gz Patch0: CVE-2024-34250.patch -Patch1: in_emitter_fix_issue_8198.patch -Patch2: fix_issue_8025.patch -Patch3: CVE-2024-26455.patch -Patch4: CVE-2024-25629.patch -Patch5: CVE-2024-25431.patch -Patch6: CVE-2024-27532.patch +Patch1: CVE-2024-25629.patch +Patch2: CVE-2024-28182.patch +Patch3: CVE-2024-25431.patch +Patch4: CVE-2024-27532.patch BuildRequires: bison BuildRequires: cmake BuildRequires: cyrus-sasl-devel @@ -87,6 +85,11 @@ Development files for %{name} %{_libdir}/fluent-bit/*.so %changelog +* Mon Dec 30 2024 Sudipta Pandit - 3.0.6-1 +- Bump version to 3.0.6 +- Add patches for multiple CVEs for the current version +- Remove patches for multiple fixes not required for this version + * Tue Dec 10 2024 Sudipta Pandit - 2.2.3-6 - Backport fix for CVE-2024-27532 diff --git a/SPECS/fluent-bit/in_emitter_fix_issue_8198.patch b/SPECS/fluent-bit/in_emitter_fix_issue_8198.patch deleted file mode 100644 index d9861ab126d..00000000000 --- a/SPECS/fluent-bit/in_emitter_fix_issue_8198.patch +++ /dev/null @@ -1,661 +0,0 @@ -From feb424367d08666dd9fb0a6405f05c19b6678873 Mon Sep 17 00:00:00 2001 -From: Richard Treu -Date: Fri, 9 Feb 2024 23:46:32 +0100 -Subject: [PATCH 1/6] in_emitter: Fix to prevent single record chunks and do - pause on mem_buf_limit - -The current code creates a situation, where only one record per chunk - is created. In case of a non-existing ring-buffer, the old mechanism is used. - -Also the in_emitter plugin continued to accept records even after the -set emitter_mem_buf_limit was reached. This commit implements a -check if the plugin was paused and returns accordingly. - -Signed-off-by: Richard Treu ---- - plugins/in_emitter/emitter.c | 67 +++++++++++++++++++++++++++++++++--- - 1 file changed, 62 insertions(+), 5 deletions(-) - -diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c -index 62886d1346c..532a629b924 100644 ---- a/plugins/in_emitter/emitter.c -+++ b/plugins/in_emitter/emitter.c -@@ -31,6 +31,9 @@ - - #define DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY 2000 - -+/* return values */ -+#define FLB_EMITTER_BUSY 3 -+ - struct em_chunk { - flb_sds_t tag; - struct msgpack_sbuffer mp_sbuf; /* msgpack sbuffer */ -@@ -39,6 +42,7 @@ struct em_chunk { - }; - - struct flb_emitter { -+ int coll_fd; /* collector id */ - struct mk_list chunks; /* list of all pending chunks */ - struct flb_input_instance *ins; /* input instance */ - struct flb_ring_buffer *msgs; /* ring buffer for cross-thread messages */ -@@ -97,7 +101,6 @@ int static do_in_emitter_add_record(struct em_chunk *ec, - em_chunk_destroy(ec); - return -1; - } -- /* Release the echunk */ - em_chunk_destroy(ec); - return 0; - } -@@ -118,6 +121,12 @@ int in_emitter_add_record(const char *tag, int tag_len, - ctx = (struct flb_emitter *) in->context; - ec = NULL; - -+ /* Restricted by mem_buf_limit */ -+ if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { -+ flb_plg_debug(ctx->ins, "emitter memory buffer limit reached. Not accepting record."); -+ return FLB_EMITTER_BUSY; -+ } -+ - /* Use the ring buffer first if it exists */ - if (ctx->msgs) { - memset(&temporary_chunk, 0, sizeof(struct em_chunk)); -@@ -161,8 +170,7 @@ int in_emitter_add_record(const char *tag, int tag_len, - - /* Append raw msgpack data */ - msgpack_sbuffer_write(&ec->mp_sbuf, buf_data, buf_size); -- -- return do_in_emitter_add_record(ec, in); -+ return 0; - } - - /* -@@ -191,6 +199,34 @@ static int in_emitter_ingest_ring_buffer(struct flb_input_instance *in, - return ret; - } - -+static int cb_queue_chunks(struct flb_input_instance *in, -+ struct flb_config *config, void *data) -+{ -+ int ret; -+ struct mk_list *tmp; -+ struct mk_list *head; -+ struct em_chunk *echunk; -+ struct flb_emitter *ctx; -+ -+ /* Get context */ -+ ctx = (struct flb_emitter *) data; -+ -+ /* Try to enqueue chunks under our limits */ -+ mk_list_foreach_safe(head, tmp, &ctx->chunks) { -+ echunk = mk_list_entry(head, struct em_chunk, _head); -+ -+ /* Associate this backlog chunk to this instance into the engine */ -+ ret = do_in_emitter_add_record(echunk, in); -+ if (ret == -1) { -+ flb_error("[in_emitter] error registering chunk with tag: %s", -+ echunk->tag); -+ continue; -+ } -+ } -+ -+ return 0; -+} -+ - static int in_emitter_start_ring_buffer(struct flb_input_instance *in, struct flb_emitter *ctx) - { - if (ctx->ring_buffer_size <= 0) { -@@ -257,6 +293,15 @@ static int cb_emitter_init(struct flb_input_instance *in, - return -1; - } - } -+ else{ -+ ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 50000000, config); -+ if (ret < 0) { -+ flb_error("[in_emitter] could not create collector"); -+ flb_free(ctx); -+ return -1; -+ } -+ ctx->coll_fd = ret; -+ } - - /* export plugin context */ - flb_input_set_context(in, ctx); -@@ -264,6 +309,18 @@ static int cb_emitter_init(struct flb_input_instance *in, - return 0; - } - -+static void cb_emitter_pause(void *data, struct flb_config *config) -+{ -+ struct flb_emitter *ctx = data; -+ flb_input_collector_pause(ctx->coll_fd, ctx->ins); -+} -+ -+static void cb_emitter_resume(void *data, struct flb_config *config) -+{ -+ struct flb_emitter *ctx = data; -+ flb_input_collector_resume(ctx->coll_fd, ctx->ins); -+} -+ - static int cb_emitter_exit(void *data, struct flb_config *config) - { - struct mk_list *tmp; -@@ -312,8 +369,8 @@ struct flb_input_plugin in_emitter_plugin = { - .cb_ingest = NULL, - .cb_flush_buf = NULL, - .config_map = config_map, -- .cb_pause = NULL, -- .cb_resume = NULL, -+ .cb_pause = cb_emitter_pause, -+ .cb_resume = cb_emitter_resume, - .cb_exit = cb_emitter_exit, - - /* This plugin can only be configured and invoked by the Engine only */ - -From 37826b66b29d1ad867d220313178c3feac9b792a Mon Sep 17 00:00:00 2001 -From: Richard Treu -Date: Thu, 11 Apr 2024 23:53:10 +0200 -Subject: [PATCH 2/6] filter_multiline: Pause source input plugins on filter - pause This commit will pause the inputs (sending to multiline) to not loose - any in-flight records. - -Signed-off-by: Richard Treu ---- - plugins/filter_multiline/ml.c | 14 ++++++++++++-- - plugins/filter_multiline/ml.h | 4 +++- - 2 files changed, 15 insertions(+), 3 deletions(-) - -diff --git a/plugins/filter_multiline/ml.c b/plugins/filter_multiline/ml.c -index 41b1b8a4d64..ced8ec83739 100644 ---- a/plugins/filter_multiline/ml.c -+++ b/plugins/filter_multiline/ml.c -@@ -176,7 +176,7 @@ static int flush_callback(struct flb_ml_parser *parser, - /* Emit record with original tag */ - flb_plg_trace(ctx->ins, "emitting from %s to %s", stream->input_name, stream->tag); - ret = in_emitter_add_record(stream->tag, flb_sds_len(stream->tag), buf_data, buf_size, -- ctx->ins_emitter); -+ ctx->ins_emitter, ctx->i_ins); - - return ret; - } -@@ -526,7 +526,8 @@ static void partial_timer_cb(struct flb_config *config, void *data) - ret = in_emitter_add_record(packer->tag, flb_sds_len(packer->tag), - packer->log_encoder.output_buffer, - packer->log_encoder.output_length, -- ctx->ins_emitter); -+ ctx->ins_emitter, -+ ctx->i_ins); - if (ret < 0) { - /* this shouldn't happen in normal execution */ - flb_plg_warn(ctx->ins, -@@ -741,6 +742,15 @@ static int cb_ml_filter(const void *data, size_t bytes, - return FLB_FILTER_NOTOUCH; - } - -+ if (ctx->i_ins == NULL){ -+ ctx->i_ins = i_ins; -+ } -+ if (ctx->i_ins != i_ins) { -+ flb_plg_trace(ctx->ins, "input instance changed from %s to %s", -+ ctx->i_ins->name, i_ins->name); -+ ctx->i_ins = i_ins; -+ } -+ - /* 'partial_message' mode */ - if (ctx->partial_mode == FLB_TRUE) { - return ml_filter_partial(data, bytes, tag, tag_len, -diff --git a/plugins/filter_multiline/ml.h b/plugins/filter_multiline/ml.h -index 59bf6c7e826..cae8fb64166 100644 ---- a/plugins/filter_multiline/ml.h -+++ b/plugins/filter_multiline/ml.h -@@ -73,6 +73,7 @@ struct ml_ctx { - size_t emitter_mem_buf_limit; /* Emitter buffer limit */ - struct flb_input_instance *ins_emitter; /* emitter input plugin instance */ - struct flb_config *config; /* Fluent Bit context */ -+ struct flb_input_instance *i_ins; /* Fluent Bit input instance (last used)*/ - - #ifdef FLB_HAVE_METRICS - struct cmt_counter *cmt_emitted; -@@ -82,6 +83,7 @@ struct ml_ctx { - /* Register external function to emit records, check 'plugins/in_emitter' */ - int in_emitter_add_record(const char *tag, int tag_len, - const char *buf_data, size_t buf_size, -- struct flb_input_instance *in); -+ struct flb_input_instance *in, -+ struct flb_input_instance *i_ins); - - #endif - -From 2087601806b39719ac64c2862f81e7c5222efd3a Mon Sep 17 00:00:00 2001 -From: Richard Treu -Date: Thu, 11 Apr 2024 23:55:40 +0200 -Subject: [PATCH 3/6] filter_rewrite_tag: Pause source input plugins on filter - pause This commit will pause the inputs (sending to rewrite_tag) to not loose - any in-flight records. - -Signed-off-by: Richard Treu ---- - plugins/filter_rewrite_tag/rewrite_tag.c | 7 ++++--- - plugins/filter_rewrite_tag/rewrite_tag.h | 3 ++- - 2 files changed, 6 insertions(+), 4 deletions(-) - -diff --git a/plugins/filter_rewrite_tag/rewrite_tag.c b/plugins/filter_rewrite_tag/rewrite_tag.c -index 01b0f168fe2..c8bfe029350 100644 ---- a/plugins/filter_rewrite_tag/rewrite_tag.c -+++ b/plugins/filter_rewrite_tag/rewrite_tag.c -@@ -355,7 +355,8 @@ static int ingest_inline(struct flb_rewrite_tag *ctx, - */ - static int process_record(const char *tag, int tag_len, msgpack_object map, - const void *buf, size_t buf_size, int *keep, -- struct flb_rewrite_tag *ctx, int *matched) -+ struct flb_rewrite_tag *ctx, int *matched, -+ struct flb_input_instance *i_ins) - { - int ret; - flb_sds_t out_tag; -@@ -404,7 +405,7 @@ static int process_record(const char *tag, int tag_len, msgpack_object map, - if (!ret) { - /* Emit record with new tag */ - ret = in_emitter_add_record(out_tag, flb_sds_len(out_tag), buf, buf_size, -- ctx->ins_emitter); -+ ctx->ins_emitter, i_ins); - } - else { - ret = 0; -@@ -489,7 +490,7 @@ static int cb_rewrite_tag_filter(const void *data, size_t bytes, - * If a record was emitted, the variable 'keep' will define if the record must - * be preserved or not. - */ -- is_emitted = process_record(tag, tag_len, map, (char *) data + pre, off - pre, &keep, ctx, &is_matched); -+ is_emitted = process_record(tag, tag_len, map, (char *) data + pre, off - pre, &keep, ctx, &is_matched, i_ins); - if (is_emitted == FLB_TRUE) { - /* A record with the new tag was emitted */ - emitted_num++; -diff --git a/plugins/filter_rewrite_tag/rewrite_tag.h b/plugins/filter_rewrite_tag/rewrite_tag.h -index 11c0535fde1..d73b49f12eb 100644 ---- a/plugins/filter_rewrite_tag/rewrite_tag.h -+++ b/plugins/filter_rewrite_tag/rewrite_tag.h -@@ -57,7 +57,8 @@ struct flb_rewrite_tag { - /* Register external function to emit records, check 'plugins/in_emitter' */ - int in_emitter_add_record(const char *tag, int tag_len, - const char *buf_data, size_t buf_size, -- struct flb_input_instance *in); -+ struct flb_input_instance *in, -+ struct flb_input_instance *i_ins); - int in_emitter_get_collector_id(struct flb_input_instance *in); - - - -From 64214ada1ded5afc1dae042473b50fa1f8dc9467 Mon Sep 17 00:00:00 2001 -From: Richard Treu -Date: Thu, 11 Apr 2024 23:57:15 +0200 -Subject: [PATCH 4/6] in_emitter: Pause source input plugins on in_emitter - pause This commit will pause all known inputs (sending to multiline) to not - loose any in-flight records. in_emitter will keep track of all sending input - plugins and actively pause/resume them in case in_emitter is paused/resumed. - -Signed-off-by: Richard Treu ---- - plugins/in_emitter/emitter.c | 77 ++++++++++++++++++++++++++++++++++-- - 1 file changed, 73 insertions(+), 4 deletions(-) - -diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c -index 532a629b924..8092a7954ee 100644 ---- a/plugins/in_emitter/emitter.c -+++ b/plugins/in_emitter/emitter.c -@@ -32,7 +32,7 @@ - #define DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY 2000 - - /* return values */ --#define FLB_EMITTER_BUSY 3 -+#define FLB_EMITTER_BUSY -2 - - struct em_chunk { - flb_sds_t tag; -@@ -41,12 +41,18 @@ struct em_chunk { - struct mk_list _head; - }; - -+struct input_ref { -+ struct flb_input_instance *i_ins; -+ struct mk_list _head; -+}; -+ - struct flb_emitter { - int coll_fd; /* collector id */ - struct mk_list chunks; /* list of all pending chunks */ - struct flb_input_instance *ins; /* input instance */ - struct flb_ring_buffer *msgs; /* ring buffer for cross-thread messages */ - int ring_buffer_size; /* size of the ring buffer */ -+ struct mk_list i_ins_list; /* instance list of linked/sending inputs */ - }; - - struct em_chunk *em_chunk_create(const char *tag, int tag_len, -@@ -89,6 +95,12 @@ int static do_in_emitter_add_record(struct em_chunk *ec, - struct flb_emitter *ctx = (struct flb_emitter *) in->context; - int ret; - -+ if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { -+ flb_plg_debug(ctx->ins, "_emitter %s paused. Not processing records.", -+ ctx->ins->name); -+ return FLB_EMITTER_BUSY; -+ } -+ - /* Associate this backlog chunk to this instance into the engine */ - ret = flb_input_log_append(in, - ec->tag, flb_sds_len(ec->tag), -@@ -111,15 +123,45 @@ int static do_in_emitter_add_record(struct em_chunk *ec, - */ - int in_emitter_add_record(const char *tag, int tag_len, - const char *buf_data, size_t buf_size, -- struct flb_input_instance *in) -+ struct flb_input_instance *in, -+ struct flb_input_instance *i_ins) - { - struct em_chunk temporary_chunk; - struct mk_list *head; -+ struct input_ref *i_ref; -+ bool ref_found; -+ struct mk_list *tmp; -+ - struct em_chunk *ec; - struct flb_emitter *ctx; - - ctx = (struct flb_emitter *) in->context; - ec = NULL; -+ /* Iterate over list of already known (source) inputs */ -+ /* If new, add it to the list to be able to pause it later on */ -+ ref_found = false; -+ mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { -+ i_ref = mk_list_entry(head, struct input_ref, _head); -+ if(i_ref->i_ins == i_ins){ -+ ref_found = true; -+ break; -+ } -+ } -+ if (!ref_found) { -+ i_ref = flb_malloc(sizeof(struct input_ref)); -+ if (!i_ref) { -+ flb_errno(); -+ return FLB_FILTER_NOTOUCH; -+ } -+ i_ref->i_ins = i_ins; -+ mk_list_add(&i_ref->_head, &ctx->i_ins_list); -+ /* If in_emitter is paused, but new input plugin is not paused, pause it */ -+ if (flb_input_buf_paused(ctx->ins) == FLB_TRUE && -+ flb_input_buf_paused(i_ins) == FLB_FALSE) { -+ flb_input_pause(i_ins); -+ } -+ } -+ - - /* Restricted by mem_buf_limit */ - if (flb_input_buf_paused(ctx->ins) == FLB_TRUE) { -@@ -268,6 +310,8 @@ static int cb_emitter_init(struct flb_input_instance *in, - ctx->ins = in; - mk_list_init(&ctx->chunks); - -+ mk_list_init(&ctx->i_ins_list); -+ - - ret = flb_input_config_map_set(in, (void *) ctx); - if (ret == -1) { -@@ -294,7 +338,7 @@ static int cb_emitter_init(struct flb_input_instance *in, - } - } - else{ -- ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 50000000, config); -+ ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 25000000, config); - if (ret < 0) { - flb_error("[in_emitter] could not create collector"); - flb_free(ctx); -@@ -312,13 +356,31 @@ static int cb_emitter_init(struct flb_input_instance *in, - static void cb_emitter_pause(void *data, struct flb_config *config) - { - struct flb_emitter *ctx = data; -+ struct mk_list *tmp; -+ struct mk_list *head; -+ struct input_ref *i_ref; -+ -+ /* Pause all known senders */ - flb_input_collector_pause(ctx->coll_fd, ctx->ins); -+ mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { -+ i_ref = mk_list_entry(head, struct input_ref, _head); -+ flb_input_pause(i_ref->i_ins); -+ } - } - - static void cb_emitter_resume(void *data, struct flb_config *config) - { - struct flb_emitter *ctx = data; -+ struct mk_list *tmp; -+ struct mk_list *head; -+ struct input_ref *i_ref; -+ -+ /* Resume all known senders */ - flb_input_collector_resume(ctx->coll_fd, ctx->ins); -+ mk_list_foreach_safe(head, tmp, &ctx->i_ins_list) { -+ i_ref = mk_list_entry(head, struct input_ref, _head); -+ flb_input_resume(i_ref->i_ins); -+ } - } - - static int cb_emitter_exit(void *data, struct flb_config *config) -@@ -328,9 +390,9 @@ static int cb_emitter_exit(void *data, struct flb_config *config) - struct flb_emitter *ctx = data; - struct em_chunk *echunk; - struct em_chunk ec; -+ struct input_ref *i_ref; - int ret; - -- - mk_list_foreach_safe(head, tmp, &ctx->chunks) { - echunk = mk_list_entry(head, struct em_chunk, _head); - mk_list_del(&echunk->_head); -@@ -346,6 +408,13 @@ static int cb_emitter_exit(void *data, struct flb_config *config) - flb_ring_buffer_destroy(ctx->msgs); - } - -+ mk_list_foreach_safe(head,tmp, &ctx->i_ins_list) { -+ i_ref = mk_list_entry(head, struct input_ref, _head); -+ mk_list_del(&i_ref->_head); -+ flb_free(i_ref); -+ } -+ -+ - flb_free(ctx); - return 0; - } - -From f6137ec60bdffc6f5c80e491b463541702438772 Mon Sep 17 00:00:00 2001 -From: Richard Treu -Date: Fri, 12 Apr 2024 00:00:39 +0200 -Subject: [PATCH 5/6] flb_input: Add missing input resume message This commit - will add a resume message, when a paused input plugin is resumed. - -Signed-off-by: Richard Treu ---- - src/flb_input.c | 1 + - 1 file changed, 1 insertion(+) - -diff --git a/src/flb_input.c b/src/flb_input.c -index a990a9d2805..7b614ccdb44 100644 ---- a/src/flb_input.c -+++ b/src/flb_input.c -@@ -1729,6 +1729,7 @@ int flb_input_resume(struct flb_input_instance *ins) - flb_input_thread_instance_resume(ins); - } - else { -+ flb_info("[input] resume %s", flb_input_name(ins)); - ins->p->cb_resume(ins->context, ins->config); - } - } - -From 3162d0c3db2f7df9392c6d880280b923002066b1 Mon Sep 17 00:00:00 2001 -From: Richard Treu -Date: Fri, 12 Apr 2024 00:02:03 +0200 -Subject: [PATCH 6/6] tests: filter_multiline: Add test for in_emitter pause by - using multiline This commit will add a test for pause functionality of - in_emitter. The test uses a small emitter buffer size, so the in_emitter will - definitely be paused. - -Signed-off-by: Richard Treu ---- - tests/runtime/filter_multiline.c | 124 +++++++++++++++++++++++++++++++ - 1 file changed, 124 insertions(+) - -diff --git a/tests/runtime/filter_multiline.c b/tests/runtime/filter_multiline.c -index 18253a5b2c7..ed6ffb6b7cb 100644 ---- a/tests/runtime/filter_multiline.c -+++ b/tests/runtime/filter_multiline.c -@@ -2,6 +2,7 @@ - - #include - #include -+#include - #include "flb_tests_runtime.h" - - struct filter_test { -@@ -120,7 +121,34 @@ static int cb_check_str_list(void *record, size_t size, void *data) - return 0; - } - -+void wait_with_timeout(uint32_t timeout_ms, int *output_num, int expected) -+{ -+ struct flb_time start_time; -+ struct flb_time end_time; -+ struct flb_time diff_time; -+ uint64_t elapsed_time_flb = 0; -+ -+ flb_time_get(&start_time); -+ -+ while (true) { -+ *output_num = get_output_num(); -+ -+ if (*output_num == expected) { -+ break; -+ } -+ -+ flb_time_msleep(100); -+ flb_time_get(&end_time); -+ flb_time_diff(&end_time, &start_time, &diff_time); -+ elapsed_time_flb = flb_time_to_nanosec(&diff_time) / 1000000; - -+ if (elapsed_time_flb > timeout_ms) { -+ flb_warn("[timeout] elapsed_time: %ld", elapsed_time_flb); -+ // Reached timeout. -+ break; -+ } -+ } -+} - - static struct filter_test *filter_test_create(struct flb_lib_out_cb *data) - { -@@ -682,6 +710,100 @@ static void flb_test_ml_buffered_16_streams() - filter_test_destroy(ctx); - } - -+/* This test will test the pausing of in_emitter */ -+static void flb_test_ml_buffered_16_streams_pausing() -+{ -+ struct flb_lib_out_cb cb_data; -+ struct filter_test *ctx; -+ int i_ffds[16] = {0}; -+ int ffd_num = sizeof(i_ffds)/sizeof(int); -+ int ret; -+ int i; -+ int j; -+ int bytes; -+ int len; -+ char line_buf[2048] = {0}; -+ char tag_buf[32] = {0}; -+ int line_num; -+ int num; -+ -+ char *expected_strs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property\\n at com.example.myproject.Author.getBookIds(xx.java:38)\\n at com.example.myproject.Bootstrap.main(Bootstrap.java:14)\\nCaused by: java.lang.NullPointerException\\n at com.example.myproject.Book.getId(Book.java:22)\\n at com.example.myproject.Author.getBookIds(Author.java:35)\\n ... 1 more"}; -+ -+ struct str_list expected = { -+ .size = sizeof(expected_strs)/sizeof(char*), -+ .lists = &expected_strs[0], -+ .ignore_min_line_num = 64, -+ }; -+ -+ char *ml_logs[] = {"Exception in thread main java.lang.IllegalStateException: ..null property", -+ " at com.example.myproject.Author.getBookIds(xx.java:38)", -+ " at com.example.myproject.Bootstrap.main(Bootstrap.java:14)", -+ "Caused by: java.lang.NullPointerException", -+ " at com.example.myproject.Book.getId(Book.java:22)", -+ " at com.example.myproject.Author.getBookIds(Author.java:35)", -+ " ... 1 more", -+ "single line"}; -+ -+ cb_data.cb = cb_check_str_list; -+ cb_data.data = (void *)&expected; -+ -+ clear_output_num(); -+ -+ line_num = sizeof(ml_logs)/sizeof(char*); -+ -+ /* Create test context */ -+ ctx = filter_test_create((void *) &cb_data); -+ if (!ctx) { -+ exit(EXIT_FAILURE); -+ } -+ flb_service_set(ctx->flb, -+ "Flush", "0.100000000", -+ "Grace", "2", -+ NULL); -+ -+ i_ffds[0] = ctx->i_ffd; -+ for (i=1; iflb, (char *) "lib", NULL); -+ TEST_CHECK(i_ffds[i] >= 0); -+ sprintf(&tag_buf[0], "test%d", i); -+ flb_input_set(ctx->flb, i_ffds[i], "tag", tag_buf, NULL); -+ } -+ -+ /* Configure filter */ -+ /* Set mem_buf_limit small, so in_emitter will be paused */ -+ ret = flb_filter_set(ctx->flb, ctx->f_ffd, -+ "multiline.key_content", "log", -+ "multiline.parser", "java", -+ "buffer", "on", -+ "debug_flush", "on", -+ "emitter_mem_buf_limit", "1k", -+ NULL); -+ TEST_CHECK(ret == 0); -+ -+ -+ /* Start the engine */ -+ ret = flb_start(ctx->flb); -+ TEST_CHECK(ret == 0); -+ -+ for (i=0; iflb, i_ffds[j], &line_buf[0], len); -+ TEST_CHECK(bytes == len); -+ } -+ } -+ wait_with_timeout(20000, &num, ffd_num); -+ -+ if (!TEST_CHECK(num > 0)) { -+ TEST_MSG("output error. got %d expect more than 0 records.", num); -+ /* The internal flb_lib_push cannot be paused, so records may be lost */ -+ /* However, there should be at least some records */ -+ } -+ -+ filter_test_destroy(ctx); -+} -+ - - - -@@ -695,5 +817,7 @@ TEST_LIST = { - - {"flb_test_multiline_partial_message_concat" , flb_test_multiline_partial_message_concat }, - {"flb_test_multiline_partial_message_concat_two_ids" , flb_test_multiline_partial_message_concat_two_ids }, -+ -+ {"ml_buffered_16_streams_pausing" , flb_test_ml_buffered_16_streams_pausing }, - {NULL, NULL} - }; diff --git a/cgmanifest.json b/cgmanifest.json index 422f809c609..bacdcd1e48a 100644 --- a/cgmanifest.json +++ b/cgmanifest.json @@ -3708,8 +3708,8 @@ "type": "other", "other": { "name": "fluent-bit", - "version": "2.2.3", - "downloadUrl": "https://github.com/fluent/fluent-bit/archive/refs/tags/v2.2.3.tar.gz" + "version": "3.0.6", + "downloadUrl": "https://github.com/fluent/fluent-bit/archive/refs/tags/v3.0.6.tar.gz" } } },