| import os
|
| from datetime import timedelta
|
|
|
| import requests
|
| from flask import Flask, jsonify, request
|
| from flask_bcrypt import Bcrypt
|
| from flask_jwt_extended import (
|
| JWTManager,
|
| create_access_token,
|
| create_refresh_token,
|
| get_jwt,
|
| get_jwt_identity,
|
| jwt_required,
|
| )
|
| from flask_sqlalchemy import SQLAlchemy
|
| from sqlalchemy import exc
|
| import random
|
|
|
| # Calculate the expiry time based on UTC
|
| expiry_time = timedelta(hours=1)
|
|
|
| app = Flask(__name__)
|
| bcrypt = Bcrypt(app)
|
| jwt = JWTManager(app)
|
|
|
| # Configure the database connection
|
| app.config["SQLALCHEMY_DATABASE_URI"] = os.environ.get(
|
| "SQLALCHEMY_DATABASE_URI"
|
| )
|
| app.config["SECRET_KEY"] = os.environ.get("SECRET_KEY")
|
| app.config["JWT_SECRET_KEY"] = os.environ.get("JWT_SECRET_KEY")
|
| app.config["JWT_TOKEN_LOCATION"] = ["headers", "cookies"]
|
| app.config["JWT_COOKIE_CSRF_PROTECT"] = False
|
|
|
| db = SQLAlchemy(app)
|
| # Maintain a token blacklist
|
| token_blacklist = set()
|
|
|
|
|
| # Callback function to check if a JWT exists in the redis blocklist
|
| @jwt.token_in_blocklist_loader
|
| def check_if_token_is_revoked(jwt_header, jwt_payload: dict):
|
| app.logger.debug(jwt_payload)
|
| jti = jwt_payload["jti"]
|
| token_in_set = jti in token_blacklist
|
|
|
| return token_in_set is not None
|
|
|
|
|
| # User model
|
| class User(db.Model):
|
| email = db.Column(db.String(100), primary_key=True)
|
| mobile_number = db.Column(db.String(20), nullable=False)
|
| password = db.Column(db.String(100), nullable=False)
|
|
|
| def __init__(self, email, mobile_number, password):
|
| self.email = email
|
| self.mobile_number = mobile_number
|
| self.password = bcrypt.generate_password_hash(password).decode()
|
|
|
| def get_id(self):
|
| return self.email
|
|
|
|
|
| with app.app_context():
|
| db.create_all()
|
|
|
|
|
| @app.route("/signup", methods=["POST"])
|
| def signup() -> jsonify:
|
| data = request.get_json()
|
| email = data.get("email")
|
| mobile_number = data.get("mobileNumber")
|
| password = data.get("password")
|
|
|
| # Check if email is already taken
|
| if User.query.filter_by(email=email).first():
|
| return (
|
| jsonify(
|
| {
|
| "type": "DUPLICATE_EMAIL",
|
| "message": "Email is already registered",
|
| }
|
| ),
|
| 409,
|
| )
|
| # Create a new user
|
| new_user = User(
|
| email=email,
|
| mobile_number=mobile_number,
|
| password=password,
|
| )
|
| app.logger.debug(repr((new_user)))
|
|
|
| try:
|
| db.session.add(new_user)
|
| db.session.commit()
|
|
|
| return (
|
| jsonify(
|
| {
|
| **get_tokens(new_user),
|
| "type": "SUCCESS",
|
| "message": "Signup successful",
|
| }
|
| ),
|
| 201,
|
| )
|
|
|
| except exc.SQLAlchemyError:
|
| return (
|
| jsonify(
|
| {
|
| "type": "DB_ERROR",
|
| "message": "Failed to create a new user",
|
| }
|
| ),
|
| 500,
|
| )
|
|
|
|
|
| def generateOTP(digitCount):
|
| min_value = 10 ** (digitCount - 1)
|
| max_value = (10**digitCount) - 1
|
| otp = str(random.randint(min_value, max_value))
|
| return otp
|
|
|
|
|
| @app.route("/otp", methods=["GET"])
|
| @jwt_required()
|
| def otp():
|
| app.logger.info(
|
| '""""""""""""""""""""""""""mobile_number""""""""""""""""""""""""""'
|
| )
|
| email = get_jwt_identity()
|
| user = User.query.filter_by(email=email).first()
|
| mobile_number = user.mobile_number
|
| app.logger.info(mobile_number)
|
| otp = generateOTP(6)
|
| link = "https://pgapi.vispl.in/fe/api/v1/send"
|
| params = {
|
| "username": "irasotpg.trans",
|
| "password": "gzzI2",
|
| "unicode": False,
|
| "from": "IRASUS",
|
| "to": f"+91{mobile_number}",
|
| "dltPrincipalEntityId": "1701167402238042327",
|
| "dltContentId": "1707167447466999347",
|
| "text": f"Your OTP for login into the iRasus app is {otp}. Do NOT disclose the OTP to anybody. iRasus NEVER asks you for your OTP.",
|
| }
|
|
|
| response = requests.get(link, params=params)
|
|
|
| if response.status_code == 200:
|
| return jsonify(
|
| {"type": "SUCCESS", "message": params["text"], "otp": otp}
|
| )
|
| else:
|
| return (
|
| jsonify({"type": "FAILURE", "message": "Failed to send OTP"}),
|
| 500,
|
| )
|
|
|
|
|
| @app.route("/login", methods=["POST"])
|
| def login():
|
| data = request.get_json()
|
| email = data.get("email")
|
| password = data.get("password")
|
|
|
| # Retrieve the user from the database
|
| user = User.query.filter_by(email=email).first()
|
|
|
| # Check if user exists and the password matches
|
| if user and bcrypt.check_password_hash(user.password, password):
|
| response = {
|
| **get_tokens(user),
|
| "type": "SUCCESS",
|
| "message": "Login successful",
|
| }
|
|
|
| return jsonify(response), 200
|
| else:
|
| return (
|
| jsonify({"type": "FAILURE", "message": "Invalid credentials"}),
|
| 401,
|
| )
|
|
|
|
|
| def get_tokens(user):
|
| access_token = create_access_token(
|
| identity=user.email, expires_delta=expiry_time
|
| )
|
| refresh_token = create_refresh_token(
|
| identity=user.email, expires_delta=expiry_time
|
| )
|
| response = {
|
| "access_token": access_token,
|
| "refresh_token": refresh_token,
|
| }
|
|
|
| return response
|
|
|
|
|
| @app.route("/logout", methods=["POST"])
|
| @jwt_required()
|
| def logout():
|
| # Extract the JWT from the request
|
| jti = get_jwt()["jti"]
|
|
|
| # Add the JWT to the token blacklist
|
| token_blacklist.add(jti)
|
|
|
| return (
|
| jsonify({"type": "SUCCESS", "message": "Logout successful"}),
|
| 200,
|
| )
|
|
|
|
|
| @app.route("/get_data", methods=["GET"])
|
| @jwt_required()
|
| def get_data():
|
| return jsonify({"type": "SUCCESS", "message": "Protected data"}), 200
|
|
|
|
|
| if __name__ == "__main__":
|
| app.run()
|