diff --git a/deploy/database.env b/deploy/database.env index 6e885ddf..4f161696 100644 --- a/deploy/database.env +++ b/deploy/database.env @@ -4,4 +4,4 @@ POSTGRES_DATABANK_DB=cogstack POSTGRES_DB_MAX_CONNECTIONS=100 # Prefix of file names to load the DB schema for in /services/cogstack-db/(pgsql/mssql)/schemas/ folder -DB_SCHEMA_PREFIX="cogstack_db" \ No newline at end of file +DB_SCHEMA_PREFIX="cogstack_db" diff --git a/nifi/.gitignore b/nifi/.gitignore index 2a358a49..9a845353 100644 --- a/nifi/.gitignore +++ b/nifi/.gitignore @@ -5,4 +5,5 @@ devel **flow **login **conf -*.jks \ No newline at end of file +*.jks +**__pycache__ \ No newline at end of file diff --git a/nifi/user-scripts/annotation_manager.py b/nifi/user-scripts/annotation_manager.py index 9677bab7..9d0ff3ce 100644 --- a/nifi/user-scripts/annotation_manager.py +++ b/nifi/user-scripts/annotation_manager.py @@ -1,26 +1,75 @@ import os -import sqlite3 +import json +import traceback +import sys +from utils.sqlite_query import connect_and_query,check_db_exists,create_db_from_file +global DOCUMENT_ID_FIELD_NAME +global USER_SCRIPT_DB_DIR +global DB_FILE_NAME +global LOG_FILE_NAME +global OPERATION_MODE -db_folder = os.getenv("USER_SCRIPT_DB_DIR", "./db/") -db_file_name = "index_name.db" +ANNOTATION_DB_SQL_FILE_PATH = "/opt/data/cogstack-db/sqlite/schemas/annotations_nlp_create_schema.sql" +USER_SCRIPT_DB_DIR = os.getenv("USER_SCRIPT_DB_DIR") +# possible values: +# - check - check if a document ID has already been annotated +# - insert - inserts new annotation(s) into DB +OPERATION_MODE = "check" + +# get the arguments from the "Command Arguments" property in NiFi, we are looking at anything after the 1st arg (which is the script name) +USER_SCRIPT_DB_DIR = os.environ.get("USER_SCRIPT_DB_DIR") + +for arg in sys.argv: + _arg = arg.split("=", 1) + if _arg[0] == "index_db_file_name": + INDEX_DB_FILE_NAME = _arg[1] + elif _arg[0] == "document_id_field": + DOCUMENT_ID_FIELD_NAME = _arg[1] + elif _arg[0] == "user_script_db_dir": + USER_SCRIPT_DB_DIR = _arg[1] + elif _arg[0] == "log_file_name": + LOG_FILE_NAME = _arg[1] + elif _arg[0] == "operation_mode": + OPERATION_MODE = _arg[1] + +def main(): + input_stream = sys.stdin.read() + output_stream = {} -def connect_db(query): try: - sqlite_connection = sqlite3.connect(os.path.join(db_folder, db_file_name)) - cursor = sqlite_connection.cursor() - sql_query = query - cursor.execute(sql_query) - cursor.close() - except sqlite3.Error as error: - print("Error while connecting to sqlite", error) - finally: - if sqlite_connection: - sqlite_connection.close() + log_file_path = os.path.join(str(os.environ .get("USER_SCRIPT_LOGS_DIR", "/opt/nifi/user-scripts/logs/")), str(LOG_FILE_NAME)) + db_file_path = os.path.join(USER_SCRIPT_DB_DIR, INDEX_DB_FILE_NAME) + json_data_record = json.loads(input_stream) -connect_db("select sqlite_version();") + if len(check_db_exists("annotations", db_file_path)) == 0: + create_db_from_file(ANNOTATION_DB_SQL_FILE_PATH, db_file_path) + if OPERATION_MODE == "check": + document_id = str(json_data_record["content"][0]["id"]) + query = "SELECT * FROM annotations WHERE elasticsearch_id LIKE (" + document_id + "_" + ")" + result = connect_and_query(query, db_file_path) + + output_stream = input_stream + if len(result) > 1: + output_stream = len(result) + else: + output_stream = json_data_record + elif OPERATION_MODE == "insert": + query = "INSERT INTO annotations (elasticsearch_id) VALUES (" + '"' + str(json_data_record["meta.docid"]) + "_" + str(json_data_record["nlp.id"]) + '"' +")" + result = connect_and_query(query, db_file_path, sql_script_mode=True) + output_stream = json_data_record + except Exception as exception: + if os.path.exists(log_file_path): + with open(log_file_path, "a+") as log_file: + log_file.write("\n", + traceback.print_exc()) + else: + with open(log_file_path, "w+") as log_file: + log_file.write(traceback.print_exc()) + finally: + return output_stream +sys.stdout.write(json.dumps(main())) diff --git a/nifi/user-scripts/utils/sqlite_query.py b/nifi/user-scripts/utils/sqlite_query.py new file mode 100644 index 00000000..17eec0d9 --- /dev/null +++ b/nifi/user-scripts/utils/sqlite_query.py @@ -0,0 +1,39 @@ +import sqlite3 + +def connect_and_query(query, db_file_path, sql_script_mode=False): + """ + Executes whatever query. + + Args: + query (string): your SQL query. + """ + result = [] + sqlite_connection = None + + try: + sqlite_connection = sqlite3.connect(db_file_path) + cursor = sqlite_connection.cursor() + if not sql_script_mode: + cursor.execute(query) + result = cursor.fetchall() + else: + cursor.executescript(query) + sqlite_connection.commit() + cursor.close() + except sqlite3.Error as error: + raise sqlite3.Error(error) + finally: + if sqlite_connection: + sqlite_connection.close() + + return result + +def check_db_exists(table_name, db_file_path): + query = "PRAGMA table_info(" + table_name + ");" + return connect_and_query(query=query, db_file_path=db_file_path) + +def create_db_from_file(sqlite_file_path, db_file_path): + query = "" + with open(sqlite_file_path, mode="r") as sql_file: + query = sql_file.read() + return connect_and_query(query=query, db_file_path=db_file_path, sql_script_mode=True) \ No newline at end of file diff --git a/services/cogstack-db/sqlite/schemas/annotations_nlp_create_schema.sql b/services/cogstack-db/sqlite/schemas/annotations_nlp_create_schema.sql index 6deb5115..658b8cba 100644 --- a/services/cogstack-db/sqlite/schemas/annotations_nlp_create_schema.sql +++ b/services/cogstack-db/sqlite/schemas/annotations_nlp_create_schema.sql @@ -1,13 +1,13 @@ -CREATE TABLE documents ( +CREATE TABLE IF NOT EXISTS documents ( id VARCHAR PRIMARY KEY, - document_id VARCHAR NOT NULL, + document_id VARCHAR NOT NULL, document_text TEXT ); -CREATE TABLE annotations ( - id BIGINT PRIMARY KEY, +CREATE TABLE IF NOT EXISTS annotations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, elasticsearch_id VARCHAR NULL, - label VARCHAR(255) NOT NULL, + label VARCHAR(255), label_id VARCHAR(10), source_value VARCHAR, accuracy DECIMAL, @@ -22,18 +22,18 @@ CREATE TABLE annotations ( snomed VARCHAR, "type" VARCHAR(255), medcat_version VARCHAR, - model_id_used INTEGER REFERENCES nlp_models + model_id_used INTEGER REFERENCES nlp_models NULL ); -CREATE TABLE meta_annotations ( - id BIGINT PRIMARY KEY, - annotation_id BIGINT REFERENCES annotations, +CREATE TABLE IF NOT EXISTS meta_annotations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + annotation_id INTEGER REFERENCES annotations NULL, "value" VARCHAR, confidence DECIMAL, "name" VARCHAR ); -CREATE TABLE nlp_models ( +CREATE TABLE IF NOT EXISTS nlp_models ( id BIGINT PRIMARY KEY, "name" VARCHAR NOT NULL, tag VARCHAR,