forked from Azure-Samples/ms-identity-python-webapp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
105 lines (88 loc) · 4.13 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import uuid
import requests
from flask import Flask, render_template, session, request, redirect, url_for
from flask_session import Session # https://pythonhosted.org/Flask-Session
import msal
import app_config
app = Flask(__name__)
app.config.from_object(app_config)
Session(app)
# This section is needed for url_for("foo", _external=True) to automatically
# generate http scheme when this sample is running on localhost,
# and to generate https scheme when it is deployed behind reversed proxy.
# See also https://flask.palletsprojects.com/en/1.0.x/deploying/wsgi-standalone/#proxy-setups
from werkzeug.middleware.proxy_fix import ProxyFix
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1, x_host=1)
@app.route("/")
def index():
if not session.get("user"):
return redirect(url_for("login"))
return render_template('index.html', user=session["user"], version=msal.__version__)
@app.route("/login")
def login():
session["state"] = str(uuid.uuid4())
# Technically we could use empty list [] as scopes to do just sign in,
# here we choose to also collect end user consent upfront
auth_url = _build_auth_url(scopes=app_config.SCOPE, state=session["state"])
return render_template("login.html", auth_url=auth_url, version=msal.__version__)
@app.route(app_config.REDIRECT_PATH) # Its absolute URL must match your app's redirect_uri set in AAD
def authorized():
if request.args.get('state') != session.get("state"):
return redirect(url_for("index")) # No-OP. Goes back to Index page
if "error" in request.args: # Authentication/Authorization failure
return render_template("auth_error.html", result=request.args)
if request.args.get('code'):
cache = _load_cache()
result = _build_msal_app(cache=cache).acquire_token_by_authorization_code(
request.args['code'],
scopes=app_config.SCOPE, # Misspelled scope would cause an HTTP 400 error here
redirect_uri=url_for("authorized", _external=True))
if "error" in result:
return render_template("auth_error.html", result=result)
session["user"] = result.get("id_token_claims")
_save_cache(cache)
return redirect(url_for("index"))
@app.route("/logout")
def logout():
session.clear() # Wipe out user and its token cache from session
return redirect( # Also logout from your tenant's web session
app_config.AUTHORITY + "/oauth2/v2.0/logout" +
"?post_logout_redirect_uri=" + url_for("index", _external=True))
@app.route("/graphcall")
def graphcall():
token = _get_token_from_cache(app_config.SCOPE)
if not token:
return redirect(url_for("login"))
graph_data = requests.get( # Use token to call downstream service
app_config.ENDPOINT,
headers={'Authorization': 'Bearer ' + token['access_token']},
).json()
return render_template('display.html', result=graph_data)
def _load_cache():
cache = msal.SerializableTokenCache()
if session.get("token_cache"):
cache.deserialize(session["token_cache"])
return cache
def _save_cache(cache):
if cache.has_state_changed:
session["token_cache"] = cache.serialize()
def _build_msal_app(cache=None, authority=None):
return msal.ConfidentialClientApplication(
app_config.CLIENT_ID, authority=authority or app_config.AUTHORITY,
client_credential=app_config.CLIENT_SECRET, token_cache=cache)
def _build_auth_url(authority=None, scopes=None, state=None):
return _build_msal_app(authority=authority).get_authorization_request_url(
scopes or [],
state=state or str(uuid.uuid4()),
redirect_uri=url_for("authorized", _external=True))
def _get_token_from_cache(scope=None):
cache = _load_cache() # This web app maintains one cache per session
cca = _build_msal_app(cache=cache)
accounts = cca.get_accounts()
if accounts: # So all account(s) belong to the current signed-in user
result = cca.acquire_token_silent(scope, account=accounts[0])
_save_cache(cache)
return result
app.jinja_env.globals.update(_build_auth_url=_build_auth_url) # Used in template
if __name__ == "__main__":
app.run()