Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manage Whatsapp Flows #1224

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions kairon/api/app/routers/bot/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,128 @@ async def initiate_platform_onboarding(
return Response(message='Channel added', data=channel_endpoint)


@router.post("/whatsapp/flows/{bsp_type}", response_model=Response)
async def add_whatsapp_flow(
request_data: DictData,
bsp_type: str = Path(description="Business service provider type",
examples=[WhatsappBSPTypes.bsp_360dialog.value]),
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS)
):
"""
Adds whatsapp flows for configured bsp account. New Flows are created as drafts.
"""
provider = BusinessServiceProviderFactory.get_instance(bsp_type)(current_user.get_bot(), current_user.get_user())
response = provider.add_whatsapp_flow(request_data.data, current_user.get_bot(), current_user.get_user())
return Response(data=response)


@router.post("/whatsapp/flows/{bsp_type}/{flow_id}", response_model=Response)
async def edit_whatsapp_flow(
request_data: DictData,
flow_id: str = Path(description="flow id", examples=["594425479261596"]),
bsp_type: str = Path(description="Business service provider type",
examples=[WhatsappBSPTypes.bsp_360dialog.value]),
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS)
):
"""
Edits whatsapp flows for configured bsp account. New Flows are created as drafts.
"""
provider = BusinessServiceProviderFactory.get_instance(bsp_type)(current_user.get_bot(), current_user.get_user())
response = provider.edit_whatsapp_flow(flow_id, request_data.data.get("flow_json"))
return Response(data=response)


@router.get("/whatsapp/flows/{bsp_type}/{flow_id}", response_model=Response)
async def preview_whatsapp_flow(
flow_id: str = Path(description="flow id", examples=["594425479261596"]),
bsp_type: str = Path(description="Business service provider type",
examples=[WhatsappBSPTypes.bsp_360dialog.value]),
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS)
):
"""
Flows can be previewed through a public link generated with this endpoint.
"""
provider = BusinessServiceProviderFactory.get_instance(bsp_type)(current_user.get_bot(), current_user.get_user())
response = provider.preview_whatsapp_flow(flow_id)
return Response(data=response)


@router.get("/whatsapp/flows/{bsp_type}/{flow_id}/assets", response_model=Response)
async def get_whatsapp_flow_assets(
flow_id: str = Path(description="flow id", examples=["594425479261596"]),
bsp_type: str = Path(description="Business service provider type",
examples=[WhatsappBSPTypes.bsp_360dialog.value]),
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS)
):
"""
Returns all assets attached to a specified flow with this endpoint.
"""
provider = BusinessServiceProviderFactory.get_instance(bsp_type)(current_user.get_bot(), current_user.get_user())
response = provider.get_whatsapp_flow_assets(flow_id)
return Response(data=response)


@router.post("/whatsapp/flows/{bsp_type}/{flow_id}/deprecate", response_model=Response)
async def deprecate_whatsapp_flow(
flow_id: str = Path(description="flow id", examples=["594425479261596"]),
bsp_type: str = Path(description="Business service provider type",
examples=[WhatsappBSPTypes.bsp_360dialog.value]),
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS)
):
"""
Flow can be deprecated with this endpoint.
"""
provider = BusinessServiceProviderFactory.get_instance(bsp_type)(current_user.get_bot(), current_user.get_user())
response = provider.deprecate_whatsapp_flow(flow_id)
return Response(data=response)


@router.get("/whatsapp/flows/{bsp_type}", response_model=Response)
async def retrieve_whatsapp_flows(
request: Request,
bsp_type: str = Path(description="Business service provider type",
examples=[WhatsappBSPTypes.bsp_360dialog.value]),
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS)
):
"""
Retrieves all whatsapp flows for configured bsp account.
Query parameters passed are used as filters while retrieving these flows.
"""
provider = BusinessServiceProviderFactory.get_instance(bsp_type)(current_user.get_bot(), current_user.get_user())
flows = provider.list_whatsapp_flows(**request.query_params)
return Response(data={"flows": flows})


@router.delete("/whatsapp/flows/{bsp_type}/{flow_id}", response_model=Response)
async def delete_whatsapp_flow(
flow_id: str = Path(description="flow id", examples=["594425479261596"]),
bsp_type: str = Path(description="Business service provider type",
examples=[WhatsappBSPTypes.bsp_360dialog.value]),
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS)
):
"""
Deletes whatsapp flow from configured bsp account.
"""
provider = BusinessServiceProviderFactory.get_instance(bsp_type)(current_user.get_bot(), current_user.get_user())
response = provider.delete_flow(flow_id)
return Response(data=response)


@router.post("/whatsapp/flows/{bsp_type}/{flow_id}/publish", response_model=Response)
async def publish_whatsapp_flow(
flow_id: str = Path(description="flow id", examples=["594425479261596"]),
bsp_type: str = Path(description="Business service provider type",
examples=[WhatsappBSPTypes.bsp_360dialog.value]),
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS)
):
"""
Publishes whatsapp flow from configured bsp account.
"""
provider = BusinessServiceProviderFactory.get_instance(bsp_type)(current_user.get_bot(), current_user.get_user())
response = provider.publish_flow(flow_id)
return Response(data=response)


@router.post("/whatsapp/templates/{bsp_type}", response_model=Response)
async def add_message_templates(
request_data: DictData,
Expand Down
149 changes: 149 additions & 0 deletions kairon/shared/channels/whatsapp/bsp/dialog360.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ast
import json
from typing import Text, Dict

from loguru import logger
Expand Down Expand Up @@ -82,6 +83,154 @@ def save_channel_config(self, clientId: Text, client: Text, channels: list, part
}
return ChatDataProcessor.save_channel_config(conf, self.bot, self.user)

def get_flow_endpoint_url(self):
config = ChatDataProcessor.get_channel_config(ChannelTypes.WHATSAPP.value, self.bot, mask_characters=False)
partner_id = Utility.environment["channels"]["360dialog"]["partner_id"]
waba_account_id = config.get("config", {}).get("waba_account_id")
base_url = Utility.system_metadata["channels"]["whatsapp"]["business_providers"]["360dialog"][
"hub_base_url"]
flow_endpoint = f'/api/v2/partners/{partner_id}/waba_accounts/{waba_account_id}/flows'
return base_url, flow_endpoint

def add_whatsapp_flow(self, data: Dict, bot: Text, user: Text):
try:
Utility.validate_add_flow_request(data)
template_name = data.pop('template')
flow_json = self.get_flow_json_from_template(template_name)

base_url, flow_endpoint = self.get_flow_endpoint_url()
headers = {"Authorization": BSP360Dialog.get_partner_auth_token()}
url = f"{base_url}{flow_endpoint}"
resp = Utility.execute_http_request(request_method="POST", http_url=url, request_body=data, headers=headers,
validate_status=True, err_msg="Failed to add flow: ",
expected_status_code=201)
if not data.get('clone_flow_id'):
flow_id = resp["id"]
self.edit_whatsapp_flow(flow_id, flow_json)
UserActivityLogger.add_log(a_type=UserActivityType.flow_creation.value, email=user, bot=bot,
message=['Flow created!'])
return resp
except DoesNotExist as e:
logger.exception(e)
raise AppException("Channel not found!")

@staticmethod
def get_flow_json_from_template(template_name):
with open("metadata/flows/default_meta_flows.json", 'r') as file:
content = json.load(file)
flow_json = {}
for template in content["data"]["xfb_wa_flows_creation_options"]["templates"]:
if template_name == template['id']:
flow_json = template['flow_json']
break
return flow_json

@staticmethod
def write_flow_json_into_file(flow_json):
with open("metadata/flows/flow_json.json", 'w') as file:
json.dump(json.loads(flow_json), file, indent=4)

def edit_whatsapp_flow(self, flow_id, flow_json):
from starlette.datastructures import UploadFile

try:
base_url, flow_endpoint = self.get_flow_endpoint_url()
headers = {"Authorization": BSP360Dialog.get_partner_auth_token()}
url = f"{base_url}{flow_endpoint}/{flow_id}/assets"
request_body = {
"asset_type": "FLOW_JSON",
"name": "flow.json"
}
self.write_flow_json_into_file(flow_json)

file = UploadFile(filename="flow_json.json", file=(open("metadata/flows/flow_json.json", "rb")))

resp = Utility.execute_http_request(request_method="POST", http_url=url, request_body=request_body,
headers=headers, validate_status=True, err_msg="Failed to edit flow: ",
expected_status_code=200,
files={'file': (file.filename, file.file, file.content_type)})
return resp
except DoesNotExist as e:
logger.exception(e)
raise AppException("Channel not found!")
except Exception as e:
logger.exception(e)
raise AppException(str(e))

def preview_whatsapp_flow(self, flow_id: str):
try:
base_url, flow_endpoint = self.get_flow_endpoint_url()
headers = {"Authorization": BSP360Dialog.get_partner_auth_token()}
url = f"{base_url}{flow_endpoint}/{flow_id}/preview"
resp = Utility.execute_http_request(request_method="GET", http_url=url, headers=headers,
validate_status=True, err_msg="Failed to get flow: ")
return resp
except DoesNotExist as e:
logger.exception(e)
raise AppException("Channel not found!")

def get_whatsapp_flow_assets(self, flow_id: str):
try:
base_url, flow_endpoint = self.get_flow_endpoint_url()
headers = {"Authorization": BSP360Dialog.get_partner_auth_token()}
url = f"{base_url}{flow_endpoint}/{flow_id}/assets"
resp = Utility.execute_http_request(request_method="GET", http_url=url, headers=headers,
validate_status=True, err_msg="Failed to get flow assets: ")
return resp
except DoesNotExist as e:
logger.exception(e)
raise AppException("Channel not found!")

def deprecate_whatsapp_flow(self, flow_id: str):
try:
base_url, flow_endpoint = self.get_flow_endpoint_url()
headers = {"Authorization": BSP360Dialog.get_partner_auth_token()}
url = f"{base_url}{flow_endpoint}/{flow_id}/deprecate"
resp = Utility.execute_http_request(request_method="POST", http_url=url, headers=headers,
validate_status=True, err_msg="Failed to deprecate flow: ")
return resp
except DoesNotExist as e:
logger.exception(e)
raise AppException("Channel not found!")

def list_whatsapp_flows(self, **kwargs):
fields = kwargs.get("fields")

try:
base_url, flow_endpoint = self.get_flow_endpoint_url()
headers = {"Authorization": BSP360Dialog.get_partner_auth_token()}
url = f"{base_url}{flow_endpoint}?fields={fields}" if fields else f"{base_url}{flow_endpoint}"
resp = Utility.execute_http_request(request_method="GET", http_url=url, headers=headers,
validate_status=True, err_msg="Failed to get flows: ")
return resp
except DoesNotExist as e:
logger.exception(e)
raise AppException("Channel not found!")

def delete_flow(self, flow_id: str):
try:
base_url, flow_endpoint = self.get_flow_endpoint_url()
headers = {"Authorization": BSP360Dialog.get_partner_auth_token()}
url = f"{base_url}{flow_endpoint}/{flow_id}"
resp = Utility.execute_http_request(request_method="DELETE", http_url=url, headers=headers,
validate_status=True, err_msg="Failed to delete flow: ")
return resp
except DoesNotExist as e:
logger.exception(e)
raise AppException("Channel not found!")

def publish_flow(self, flow_id: str):
try:
base_url, flow_endpoint = self.get_flow_endpoint_url()
headers = {"Authorization": BSP360Dialog.get_partner_auth_token()}
url = f"{base_url}{flow_endpoint}/{flow_id}/publish"
resp = Utility.execute_http_request(request_method="POST", http_url=url, headers=headers,
validate_status=True, err_msg="Failed to publish flow: ")
return resp
except DoesNotExist as e:
logger.exception(e)
raise AppException("Channel not found!")

def add_template(self, data: Dict, bot: Text, user: Text):
try:
Utility.validate_create_template_request(data)
Expand Down
22 changes: 22 additions & 0 deletions kairon/shared/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class UserActivityType(str, Enum):
invalid_login = 'invalid_login'
download = "download"
template_creation = 'template_creation'
flow_creation = "flow_creation"
model_reload = "model_reload"


Expand Down Expand Up @@ -131,6 +132,27 @@ class WhatsappBSPTypes(str, Enum):
bsp_360dialog = "360dialog"


class FlowCategories(str, Enum):
SIGN_UP = "SIGN_UP"
SIGN_IN = "SIGN_IN"
APPOINTMENT_BOOKING = "APPOINTMENT_BOOKING"
LEAD_GENERATION = "LEAD_GENERATION"
CONTACT_US = "CONTACT_US"
CUSTOMER_SUPPORT = "CUSTOMER_SUPPORT"
SURVEY = "SURVEY"
OTHER = "OTHER"


class FlowTemplates(str, Enum):
FLOWS_DEFAULT = "FLOWS_DEFAULT"
FLOWS_OFFSITE_CALL_TO_ACTION = "FLOWS_OFFSITE_CALL_TO_ACTION"
FLOWS_CUSTOMER_SATISFACTION = "FLOWS_CUSTOMER_SATISFACTION"
FLOWS_LEAD_RE_ENGAGEMENT = "FLOWS_LEAD_RE_ENGAGEMENT"
FLOWS_CONTENT_ENGAGEMENT = "FLOWS_CONTENT_ENGAGEMENT"
FLOWS_REQUEST_SUPPORT = "FLOWS_REQUEST_SUPPORT"
FLOWS_UPDATE_PREFERENCES = "FLOWS_UPDATE_PREFERENCES"


class GPT3ResourceTypes(str, Enum):
embeddings = "embeddings"
chat_completion = "chat/completions"
Expand Down
35 changes: 27 additions & 8 deletions kairon/shared/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
from websockets import connect

from .actions.models import ActionParameterType
from .constants import EventClass, UserActivityType
from .constants import EventClass, UserActivityType, FlowCategories, FlowTemplates
from .constants import (
MaskingStrategy,
SYSTEM_TRIGGERED_UTTERANCES,
Expand Down Expand Up @@ -1300,6 +1300,22 @@ def reload_model(bot: Text, email: Text):
data={"username": email, "exception": exc, "status": status},
)

@staticmethod
def validate_add_flow_request(data: Dict):
required_keys = ['name', 'categories', 'template']
missing_keys = [key for key in required_keys if key not in data]
if missing_keys:
raise AppException(f'Missing {", ".join(missing_keys)} in request body!')
categories = data.get('categories')
template = data.get('template')
invalid_categories = [category for category in categories
if category not in [flow_category.value for flow_category in FlowCategories]]
invalid_template = template if template not in [flow_template.value for flow_template in FlowTemplates] else ""
if invalid_categories:
raise AppException(f'Invalid categories {", ".join(invalid_categories)} in request body!')
if invalid_template:
raise AppException(f'Invalid template {template} in request body!')
Comment on lines +1304 to +1317
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper validation of categories and template in validate_add_flow_request.

The method validate_add_flow_request checks for missing keys, invalid categories, and templates. However, it does not handle cases where categories or template might be empty or have incorrect types. Consider adding checks to ensure these fields are not empty and are of the expected type.

+        if not categories or not isinstance(categories, list):
+            raise AppException('Categories must be a non-empty list!')
+        if not template or not isinstance(template, str):
+            raise AppException('Template must be a non-empty string!')
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
def validate_add_flow_request(data: Dict):
required_keys = ['name', 'categories', 'template']
missing_keys = [key for key in required_keys if key not in data]
if missing_keys:
raise AppException(f'Missing {", ".join(missing_keys)} in request body!')
categories = data.get('categories')
template = data.get('template')
invalid_categories = [category for category in categories
if category not in [flow_category.value for flow_category in FlowCategories]]
invalid_template = template if template not in [flow_template.value for flow_template in FlowTemplates] else ""
if invalid_categories:
raise AppException(f'Invalid categories {", ".join(invalid_categories)} in request body!')
if invalid_template:
raise AppException(f'Invalid template {template} in request body!')
def validate_add_flow_request(data: Dict):
required_keys = ['name', 'categories', 'template']
missing_keys = [key for key in required_keys if key not in data]
if missing_keys:
raise AppException(f'Missing {", ".join(missing_keys)} in request body!')
categories = data.get('categories')
template = data.get('template')
if not categories or not isinstance(categories, list):
raise AppException('Categories must be a non-empty list!')
if not template or not isinstance(template, str):
raise AppException('Template must be a non-empty string!')
invalid_categories = [category for category in categories
if category not in [flow_category.value for flow_category in FlowCategories]]
invalid_template = template if template not in [flow_template.value for flow_template in FlowTemplates] else ""
if invalid_categories:
raise AppException(f'Invalid categories {", ".join(invalid_categories)} in request body!')
if invalid_template:
raise AppException(f'Invalid template {template} in request body!')


@staticmethod
def validate_create_template_request(data: Dict):
required_keys = ["name", "category", "components", "language"]
Expand Down Expand Up @@ -1695,13 +1711,16 @@ def execute_http_request(
timeout=kwargs.get("timeout"),
)
elif request_method.lower() in ["post", "put", "patch"]:
response = session.request(
request_method.upper(),
http_url,
json=request_body,
headers=headers,
timeout=kwargs.get("timeout"),
)
if kwargs.get('files'):
response = session.request(
request_method.upper(), http_url, data=request_body, headers=headers,
timeout=kwargs.get('timeout'), files=kwargs.get('files')
)
else:
response = session.request(
request_method.upper(), http_url, json=request_body, headers=headers,
timeout=kwargs.get('timeout')
)
Comment on lines +1714 to +1723
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle file uploads in HTTP requests more robustly.

The method execute_http_request handles file uploads but does not check if the files parameter is of the correct type. This could lead to errors if an incorrect type is passed. Consider adding a type check for the files parameter.

+        if kwargs.get('files') and not isinstance(kwargs.get('files'), dict):
+            raise AppException('Files must be provided as a dictionary!')
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
if kwargs.get('files'):
response = session.request(
request_method.upper(), http_url, data=request_body, headers=headers,
timeout=kwargs.get('timeout'), files=kwargs.get('files')
)
else:
response = session.request(
request_method.upper(), http_url, json=request_body, headers=headers,
timeout=kwargs.get('timeout')
)
if kwargs.get('files'):
if not isinstance(kwargs.get('files'), dict):
raise AppException('Files must be provided as a dictionary!')
response = session.request(
request_method.upper(), http_url, data=request_body, headers=headers,
timeout=kwargs.get('timeout'), files=kwargs.get('files')
)
else:
response = session.request(
request_method.upper(), http_url, json=request_body, headers=headers,
timeout=kwargs.get('timeout')
)

else:
raise AppException("Invalid request method!")
logger.debug("raw response: " + str(response.text))
Expand Down
Loading
Loading