This paste expires on 2023-05-28 07:22:22.497192. Repaste, or download this paste. . Pasted through web.

import os
from datetime import timedelta
import requests
from flask import Flask, jsonify, request
from flask_bcrypt import Bcrypt
from flask_jwt_extended import (
    JWTManager,
    create_access_token,
    create_refresh_token,
    get_jwt,
    get_jwt_identity,
    jwt_required,
)
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import exc
import random
# Calculate the expiry time based on UTC
expiry_time = timedelta(hours=1)
app = Flask(__name__)
bcrypt = Bcrypt(app)
jwt = JWTManager(app)
# Configure the database connection
app.config["SQLALCHEMY_DATABASE_URI"] = os.environ.get(
    "SQLALCHEMY_DATABASE_URI"
)
app.config["SECRET_KEY"] = os.environ.get("SECRET_KEY")
app.config["JWT_SECRET_KEY"] = os.environ.get("JWT_SECRET_KEY")
app.config["JWT_TOKEN_LOCATION"] = ["headers", "cookies"]
app.config["JWT_COOKIE_CSRF_PROTECT"] = False
db = SQLAlchemy(app)
# Maintain a token blacklist
token_blacklist = set()
# Callback function to check if a JWT exists in the redis blocklist
@jwt.token_in_blocklist_loader
def check_if_token_is_revoked(jwt_header, jwt_payload: dict):
    app.logger.debug(jwt_payload)
    jti = jwt_payload["jti"]
    token_in_set = jti in token_blacklist
    return token_in_set is not None
# User model
class User(db.Model):
    email = db.Column(db.String(100), primary_key=True)
    mobile_number = db.Column(db.String(20), nullable=False)
    password = db.Column(db.String(100), nullable=False)
    def __init__(self, email, mobile_number, password):
        self.email = email
        self.mobile_number = mobile_number
        self.password = bcrypt.generate_password_hash(password).decode()
    def get_id(self):
        return self.email
with app.app_context():
    db.create_all()
@app.route("/signup", methods=["POST"])
def signup() -> jsonify:
    data = request.get_json()
    email = data.get("email")
    mobile_number = data.get("mobileNumber")
    password = data.get("password")
    # Check if email is already taken
    if User.query.filter_by(email=email).first():
        return (
            jsonify(
                {
                    "type": "DUPLICATE_EMAIL",
                    "message": "Email is already registered",
                }
            ),
            409,
        )
    # Create a new user
    new_user = User(
        email=email,
        mobile_number=mobile_number,
        password=password,
    )
    app.logger.debug(repr((new_user)))
    try:
        db.session.add(new_user)
        db.session.commit()
        return (
            jsonify(
                {
                    **get_tokens(new_user),
                    "type": "SUCCESS",
                    "message": "Signup successful",
                }
            ),
            201,
        )
    except exc.SQLAlchemyError:
        return (
            jsonify(
                {
                    "type": "DB_ERROR",
                    "message": "Failed to create a new user",
                }
            ),
            500,
        )
def generateOTP(digitCount):
    min_value = 10 ** (digitCount - 1)
    max_value = (10**digitCount) - 1
    otp = str(random.randint(min_value, max_value))
    return otp
@app.route("/otp", methods=["GET"])
@jwt_required()
def otp():
    app.logger.info(
        '""""""""""""""""""""""""""mobile_number""""""""""""""""""""""""""'
    )
    email = get_jwt_identity()
    user = User.query.filter_by(email=email).first()
    mobile_number = user.mobile_number
    app.logger.info(mobile_number)
    otp = generateOTP(6)
    link = "https://pgapi.vispl.in/fe/api/v1/send"
    params = {
        "username": "irasotpg.trans",
        "password": "gzzI2",
        "unicode": False,
        "from": "IRASUS",
        "to": f"+91{mobile_number}",
        "dltPrincipalEntityId": "1701167402238042327",
        "dltContentId": "1707167447466999347",
        "text": f"Your OTP for login into the iRasus app is {otp}. Do NOT disclose the OTP to anybody. iRasus NEVER asks you for your OTP.",
    }
    response = requests.get(link, params=params)
    if response.status_code == 200:
        return jsonify(
            {"type": "SUCCESS", "message": params["text"], "otp": otp}
        )
    else:
        return (
            jsonify({"type": "FAILURE", "message": "Failed to send OTP"}),
            500,
        )
@app.route("/login", methods=["POST"])
def login():
    data = request.get_json()
    email = data.get("email")
    password = data.get("password")
    # Retrieve the user from the database
    user = User.query.filter_by(email=email).first()
    # Check if user exists and the password matches
    if user and bcrypt.check_password_hash(user.password, password):
        response = {
            **get_tokens(user),
            "type": "SUCCESS",
            "message": "Login successful",
        }
        return jsonify(response), 200
    else:
        return (
            jsonify({"type": "FAILURE", "message": "Invalid credentials"}),
            401,
        )
def get_tokens(user):
    access_token = create_access_token(
        identity=user.email, expires_delta=expiry_time
    )
    refresh_token = create_refresh_token(
        identity=user.email, expires_delta=expiry_time
    )
    response = {
        "access_token": access_token,
        "refresh_token": refresh_token,
    }
    return response
@app.route("/logout", methods=["POST"])
@jwt_required()
def logout():
    # Extract the JWT from the request
    jti = get_jwt()["jti"]
    # Add the JWT to the token blacklist
    token_blacklist.add(jti)
    return (
        jsonify({"type": "SUCCESS", "message": "Logout successful"}),
        200,
    )
@app.route("/get_data", methods=["GET"])
@jwt_required()
def get_data():
    return jsonify({"type": "SUCCESS", "message": "Protected data"}), 200
if __name__ == "__main__":
    app.run()
Filename: None. Size: 6kb. View raw, , hex, or download this file.