Skip to content

Commit

Permalink
NiFi: added annotation tracking/managing script.
Browse files Browse the repository at this point in the history
  • Loading branch information
vladd-bit committed Oct 6, 2023
1 parent f166bea commit 090bdce
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 27 deletions.
2 changes: 1 addition & 1 deletion deploy/database.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ POSTGRES_DATABANK_DB=cogstack
POSTGRES_DB_MAX_CONNECTIONS=100

# Prefix of file names to load the DB schema for in /services/cogstack-db/(pgsql/mssql)/schemas/ folder
DB_SCHEMA_PREFIX="cogstack_db"
DB_SCHEMA_PREFIX="cogstack_db"
3 changes: 2 additions & 1 deletion nifi/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ devel
**flow
**login
**conf
*.jks
*.jks
**__pycache__
79 changes: 64 additions & 15 deletions nifi/user-scripts/annotation_manager.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,75 @@
import os
import sqlite3
import json
import traceback
import sys
from utils.sqlite_query import connect_and_query,check_db_exists,create_db_from_file

global DOCUMENT_ID_FIELD_NAME
global USER_SCRIPT_DB_DIR
global DB_FILE_NAME
global LOG_FILE_NAME
global OPERATION_MODE

db_folder = os.getenv("USER_SCRIPT_DB_DIR", "./db/")
db_file_name = "index_name.db"
ANNOTATION_DB_SQL_FILE_PATH = "/opt/data/cogstack-db/sqlite/schemas/annotations_nlp_create_schema.sql"
USER_SCRIPT_DB_DIR = os.getenv("USER_SCRIPT_DB_DIR")

# possible values:
# - check - check if a document ID has already been annotated
# - insert - inserts new annotation(s) into DB
OPERATION_MODE = "check"

# get the arguments from the "Command Arguments" property in NiFi, we are looking at anything after the 1st arg (which is the script name)
USER_SCRIPT_DB_DIR = os.environ.get("USER_SCRIPT_DB_DIR")

for arg in sys.argv:
_arg = arg.split("=", 1)
if _arg[0] == "index_db_file_name":
INDEX_DB_FILE_NAME = _arg[1]
elif _arg[0] == "document_id_field":
DOCUMENT_ID_FIELD_NAME = _arg[1]
elif _arg[0] == "user_script_db_dir":
USER_SCRIPT_DB_DIR = _arg[1]
elif _arg[0] == "log_file_name":
LOG_FILE_NAME = _arg[1]
elif _arg[0] == "operation_mode":
OPERATION_MODE = _arg[1]

def main():
input_stream = sys.stdin.read()
output_stream = {}

def connect_db(query):
try:
sqlite_connection = sqlite3.connect(os.path.join(db_folder, db_file_name))
cursor = sqlite_connection.cursor()
sql_query = query
cursor.execute(sql_query)
cursor.close()
except sqlite3.Error as error:
print("Error while connecting to sqlite", error)
finally:
if sqlite_connection:
sqlite_connection.close()
log_file_path = os.path.join(str(os.environ .get("USER_SCRIPT_LOGS_DIR", "/opt/nifi/user-scripts/logs/")), str(LOG_FILE_NAME))
db_file_path = os.path.join(USER_SCRIPT_DB_DIR, INDEX_DB_FILE_NAME)

json_data_record = json.loads(input_stream)

connect_db("select sqlite_version();")
if len(check_db_exists("annotations", db_file_path)) == 0:
create_db_from_file(ANNOTATION_DB_SQL_FILE_PATH, db_file_path)

if OPERATION_MODE == "check":
document_id = str(json_data_record["content"][0]["id"])
query = "SELECT * FROM annotations WHERE elasticsearch_id LIKE (" + document_id + "_" + ")"
result = connect_and_query(query, db_file_path)

output_stream = input_stream
if len(result) > 1:
output_stream = len(result)
else:
output_stream = json_data_record
elif OPERATION_MODE == "insert":
query = "INSERT INTO annotations (elasticsearch_id) VALUES (" + '"' + str(json_data_record["meta.docid"]) + "_" + str(json_data_record["nlp.id"]) + '"' +")"
result = connect_and_query(query, db_file_path, sql_script_mode=True)
output_stream = json_data_record

except Exception as exception:
if os.path.exists(log_file_path):
with open(log_file_path, "a+") as log_file:
log_file.write("\n", + traceback.print_exc())
else:
with open(log_file_path, "w+") as log_file:
log_file.write(traceback.print_exc())
finally:
return output_stream

sys.stdout.write(json.dumps(main()))
39 changes: 39 additions & 0 deletions nifi/user-scripts/utils/sqlite_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import sqlite3

def connect_and_query(query, db_file_path, sql_script_mode=False):
"""
Executes whatever query.
Args:
query (string): your SQL query.
"""
result = []
sqlite_connection = None

try:
sqlite_connection = sqlite3.connect(db_file_path)
cursor = sqlite_connection.cursor()
if not sql_script_mode:
cursor.execute(query)
result = cursor.fetchall()
else:
cursor.executescript(query)
sqlite_connection.commit()
cursor.close()
except sqlite3.Error as error:
raise sqlite3.Error(error)
finally:
if sqlite_connection:
sqlite_connection.close()

return result

def check_db_exists(table_name, db_file_path):
query = "PRAGMA table_info(" + table_name + ");"
return connect_and_query(query=query, db_file_path=db_file_path)

def create_db_from_file(sqlite_file_path, db_file_path):
query = ""
with open(sqlite_file_path, mode="r") as sql_file:
query = sql_file.read()
return connect_and_query(query=query, db_file_path=db_file_path, sql_script_mode=True)
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
CREATE TABLE documents (
CREATE TABLE IF NOT EXISTS documents (
id VARCHAR PRIMARY KEY,
document_id VARCHAR NOT NULL,
document_id VARCHAR NOT NULL,
document_text TEXT
);

CREATE TABLE annotations (
id BIGINT PRIMARY KEY,
CREATE TABLE IF NOT EXISTS annotations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
elasticsearch_id VARCHAR NULL,
label VARCHAR(255) NOT NULL,
label VARCHAR(255),
label_id VARCHAR(10),
source_value VARCHAR,
accuracy DECIMAL,
Expand All @@ -22,18 +22,18 @@ CREATE TABLE annotations (
snomed VARCHAR,
"type" VARCHAR(255),
medcat_version VARCHAR,
model_id_used INTEGER REFERENCES nlp_models
model_id_used INTEGER REFERENCES nlp_models NULL
);

CREATE TABLE meta_annotations (
id BIGINT PRIMARY KEY,
annotation_id BIGINT REFERENCES annotations,
CREATE TABLE IF NOT EXISTS meta_annotations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
annotation_id INTEGER REFERENCES annotations NULL,
"value" VARCHAR,
confidence DECIMAL,
"name" VARCHAR
);

CREATE TABLE nlp_models (
CREATE TABLE IF NOT EXISTS nlp_models (
id BIGINT PRIMARY KEY,
"name" VARCHAR NOT NULL,
tag VARCHAR,
Expand Down

0 comments on commit 090bdce

Please sign in to comment.