-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
140 lines (117 loc) · 4.67 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# Flask app using JWT and Sqlite3 to register/login/logout users with REST API requests
'''
Resources:
- https://www.linkedin.com/learning/building-restful-apis-with-flask
- https://camkode.com/posts/implementing-jwt-authentication-in-flask
'''
# Import libraries
from flask import Flask, jsonify, request
import sqlite3
from flask_jwt_extended import JWTManager, jwt_required, create_access_token, get_jwt, get_jwt_identity
from flask_mail import Mail, Message
from dotenv import load_dotenv
import os
# Define a function to create sqlite table
def create_sqlite_table():
try:
connection = sqlite3.connect("app.db", timeout=10)
cursor = connection.cursor()
cursor.executescript('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
first_name TEXT NOT NULL,
last_name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
password TEXT NOT NULL
)
''')
connection.commit()
except sqlite3.IntegrityError as e:
print(f"SQLite unique constraint failed: {e}")
connection.rollback() # Roll back the transaction to avoid data corruption
finally:
connection.close()
# Create sqlite table if not created
create_sqlite_table()
# Get environment variables from .env file placed in root directory
load_dotenv()
# Flask config
app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = os.getenv('JWT_SECRET') # Place your variable to dotenv file
app.config['MAIL_SERVER']='sandbox.smtp.mailtrap.io' # Using free Mailtrap.io account to test/pretend sending emails
app.config['MAIL_PORT'] = 2525
app.config['MAIL_USERNAME'] = os.getenv('MAIL_USERNAME') # Place your variable to dotenv file
app.config['MAIL_PASSWORD'] = os.getenv('MAIL_PASSWORD') # Place your variable to dotenv file
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USE_SSL'] = False
# Jwt and mail config
jwt = JWTManager(app)
blacklisted_tokens = set()
mail = Mail(app)
# Define routes
@app.route("/")
def ok():
return jsonify(message = "200 OK"), 200
@app.route('/register', methods=['POST'])
def register():
email = request.form['email']
connection = sqlite3.connect("app.db", timeout=10)
cursor = connection.cursor()
test = cursor.execute("SELECT id FROM users WHERE email = ?", (email,)).fetchone()
if test:
return jsonify(message='That email already exists.'), 409
else:
first_name = request.form['first_name']
last_name = request.form['last_name']
password = request.form['password']
cursor.execute("INSERT INTO users (first_name, last_name, email, password) VALUES (?,?,?,?)", (first_name, last_name, email, password,))
connection.commit()
connection.close()
return jsonify(message='User created successfully.'), 201
@app.route('/login', methods=['POST'])
def login():
if request.is_json:
email = request.json['email']
password = request.json['password']
else:
email = request.form['email']
password = request.form['password']
connection = sqlite3.connect("app.db", timeout=10)
cursor = connection.cursor()
test = cursor.execute("SELECT id FROM users WHERE email = ? AND password = ?", (email,password,)).fetchone()
if test:
access_token = create_access_token(identity=email)
return jsonify(message="Login succeeded.", access_token=access_token)
else:
return jsonify(message="Bad email of password"), 401
@app.route('/logout', methods=['POST'])
@jwt_required()
def logout():
jti = get_jwt()['jti'] # Get the unique identifier for the JWT token
blacklisted_tokens.add(jti) # Add the token to the blacklist
return jsonify({'message': 'User logged out successfully'}), 200
@app.route('/protected', methods=['GET'])
@jwt_required()
def protected():
jti = get_jwt()['jti']
if jti in blacklisted_tokens:
return jsonify({'message': 'Token has been revoked'}), 401
else:
identity = get_jwt_identity()
return jsonify(logged_in_as=identity), 200
@app.route('/retrieve_password/<string:email>', methods=['GET'])
def retrieve_password(email: str):
connection = sqlite3.connect("app.db", timeout=10)
cursor = connection.cursor()
user = cursor.execute("SELECT * FROM users WHERE email = ?", (email,)).fetchone()
if user:
msg = Message("Your password is " + user[4],
sender="example@example.com",
recipients=[email])
mail.send(msg)
return jsonify(message="Password sent to " + email)
else:
return jsonify(message="That email doesn't exist"), 401
# Run flask app
if __name__ == "__main__":
app.run(debug=True)