Spaces:
Runtime error
Runtime error
| from .utils.auth_funcs import * | |
| from .utils.functools import * | |
| from .utils.JWTBearer import * | |
| from backend.models import * | |
| from backend.services.db.utils.DBQueries import DBQueries | |
| from backend.core.Exceptions import * | |
| from backend.core.ExceptionHandlers import * | |
| from backend.core.ConfigEnv import config | |
| from backend import app | |
| from fastapi import HTTPException, BackgroundTasks | |
| from pydantic import ValidationError | |
| from jose import jwt | |
| # BASE_URL = "http://127.0.0.1:8000" | |
| BASE_URL = "https://caffeinecrew-techdocs.hf.space" | |
| async def ops_signup(bgtasks: BackgroundTasks, response_result: GeneralResponse, data: UserAuth): | |
| """Wrapper method to handle signup process. | |
| Args: | |
| response_result: FrontendResponseModel. A TypedDict to return the | |
| response captured from the API to the frontend. | |
| data: UserAuth. New user's prospective credentials from the frontend | |
| to create their account. | |
| Raises: | |
| ExistingUserException: If account with entered AADHAR Number already exists. | |
| """ | |
| # querying database to check if user already exist | |
| user = DBQueries.fetch_data_from_database('auth', ['username', 'email'], f"username='{data.username}' OR email='{data.email}'") | |
| if len(list(user)) != 0: | |
| # user with the entered credentials already exists | |
| raise ExistingUserException(response_result) | |
| verifiction_token = Auth.create_access_token(f"{data.username} {data.email}", secret_name='VERIFICATION') | |
| verification_link = f"{BASE_URL}/auth/verify/{verifiction_token}" | |
| email_body_params = { | |
| "username": data.username, | |
| "verify_link": verification_link | |
| } | |
| details = { | |
| "recipients": [data.email], | |
| "subject": "Welcome to Techdocs:[Account Verification]", | |
| "template_name": "email_verification.html", | |
| "template_kwargs": email_body_params | |
| } | |
| status = post_request(url=config.MAIL_SERVER_URL, data=details, headers=None) | |
| if status != 200: | |
| raise EmailNotSentException() | |
| DBQueries.insert_to_database('auth', (data.username, Auth.get_password_hash(data.password), "", 0), | |
| ['username', 'password', 'email', 'is_verified']) | |
| response_result.status = 'success' | |
| response_result.message = [f'Activate your account by clicking on the link sent to {data.email}.\nMake sure to check your spam folder.'] | |
| def ops_login(data:LoginCreds): | |
| """Wrapper method to handle login process. | |
| Args: | |
| data: LoginCreds. User's credentials from the frontend to login to their account. | |
| Returns: | |
| TokenSchema. A Pydantic BaseModel to return the JWT tokens to the frontend. | |
| Raises: | |
| InvalidCredentialsException: If account with entered credentials does not exist. | |
| """ | |
| # querying database to check if user already exist | |
| response_result = GeneralResponse.get_instance(data={}, | |
| status="not_allowed", | |
| message=["Not authenticated"] | |
| ) | |
| user = DBQueries.fetch_data_from_database('auth', ['username', 'password', 'is_verified'], f"username='{data.username}'") | |
| user = list(user) | |
| if len(user) == 0: | |
| # user with the entered credentials does not exist | |
| raise InvalidCredentialsException(response_result) | |
| user = user[0] | |
| if not Auth.verify_password(data.password, user[1]) and Auth.verify_username(data.username, user[0]): | |
| # password is incorrect | |
| raise InvalidCredentialsException(response_result) | |
| if not user[2]: | |
| raise EmailNotVerifiedException() | |
| # password is correct | |
| return TokenSchema(access_token=Auth.create_access_token(data.username), | |
| refresh_token=Auth.create_access_token(data.username, secret_name='REFRESH'), | |
| ) | |
| def ops_regenerate_api_key(username:str) -> APIKey: | |
| user_API_entry = DBQueries.fetch_data_from_database('api_key', 'apikey', f"username='{username}'") | |
| user_API_entry = list(user_API_entry) | |
| apikey = None | |
| if len(user_API_entry) != 0: | |
| apikey = APIKey(api_key=Auth.generate_api_key(username)) | |
| DBQueries.update_data_in_database('api_key','apikey',f"username='{username}'", apikey.api_key) | |
| else: | |
| apikey = Auth.generate_api_key(username) | |
| DBQueries.insert_to_database('api_key', (username, apikey), ['username', 'apikey']) | |
| apikey = APIKey(api_key=apikey) | |
| return apikey | |
| def ops_inference(source_code:str,api_key:str,username:str): | |
| response_result = GeneralResponse.get_instance(data={}, | |
| status="not_allowed", | |
| message=["Not authenticated"] | |
| ) | |
| user=DBQueries.fetch_data_from_database('api_key', ['apikey'], f"username='{username}'") | |
| if len(list(user)) == 0: | |
| # user with the entered credentials does not exist | |
| raise InfoNotFoundException(response_result,"User not found") | |
| elif list(user)[0][0]!=api_key: | |
| raise InvalidCredentialsException(response_result) | |
| def generate_docstring(source_code_message: str): | |
| llm_response = app.state.llmchain.invoke({"instruction": source_code_message}) | |
| print(llm_response) | |
| docstring = Inference(docstr=llm_response) | |
| return docstring | |
| return generate_docstring(source_code) | |
| def ops_verify_email(request: Request, response_result: GeneralResponse, token:str): | |
| try: | |
| payload = jwt.decode( | |
| token, config.JWT_VERIFICATION_SECRET_KEY, algorithms=[config.ALGORITHM] | |
| ) | |
| token_data = TokenPayload(**payload) | |
| if datetime.fromtimestamp(token_data.exp)< datetime.now(): | |
| return app.state.templates.TemplateResponse("verification_failure.html", context={"request": request}) | |
| username, email = token_data.sub.split(' ', maxsplit=1) | |
| registered_email = DBQueries.fetch_data_from_database('auth', ['is_verified'], f"username='{username}'") | |
| registered_email = list(registered_email) | |
| if len(registered_email) == 0: | |
| raise InfoNotFoundException(response_result,"User not found") | |
| print(registered_email[0][0]) | |
| if registered_email[0][0]: | |
| return app.state.templates.TemplateResponse("verification_failure.html", context={"request": request}) | |
| DBQueries.update_data_in_database('auth','is_verified',f"username='{username}'", (True,)) | |
| DBQueries.update_data_in_database('auth','email',f"username='{username}'", email) | |
| response_result.status = 'success' | |
| response_result.message = [f'Email verified successfully'] | |
| return app.state.templates.TemplateResponse("verification_success.html", context={"request": request}) | |
| except (jwt.JWTError, ValidationError): | |
| return app.state.templates.TemplateResponse("verification_failure.html", context={"request": request}) | |