Spaces:
				
			
			
	
			
			
					
		Running
		
	
	
	
			
			
	
	
	
	
		
		
					
		Running
		
	| #!/usr/bin/env python | |
| # coding: utf-8 | |
| # # Bibliothek | |
| #!pip install PyMySQL | |
| import pprint | |
| import json | |
| import pymysql.cursors | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.responses import JSONResponse, HTMLResponse | |
| import time | |
| from dbutils.pooled_db import PooledDB | |
| import os | |
| import logging | |
| from fastapi.templating import Jinja2Templates | |
| AUSLEIHE_TABLE = os.environ.get('AUSLEIHE_TABLE', 'ausleihe_test') | |
| PAGE_TITLE = os.environ.get('PAGE_TITLE', 'Murmel Bibliothek TEST') | |
| # Create a connection pool | |
| db_config = { | |
| 'host': os.environ['MURMEL_DB_HOST'], | |
| 'user': os.environ['MURMEL_DB_USER'], | |
| 'password': os.environ['MURMEL_DB_PASSWORD'], | |
| 'database': 'murmel', | |
| 'cursorclass': pymysql.cursors.DictCursor | |
| } | |
| pool = PooledDB(pymysql, maxconnections=5, **db_config) | |
| app = FastAPI() | |
| LOG = logging.getLogger('uvicorn.error') | |
| def execute_query(sql, params=None, max_retries=3, retry_delay=1): | |
| LOG.debug("executing query " + sql + "with params" + str(params)) | |
| for attempt in range(max_retries): | |
| try: | |
| connection = pool.connection() | |
| with connection.cursor() as cursor: | |
| cursor.execute(sql, params or ()) | |
| result = cursor.fetchall() | |
| connection.commit() | |
| return result | |
| except pymysql.OperationalError as e: | |
| if attempt == max_retries - 1: | |
| raise HTTPException(status_code=500, detail="Database connection error") | |
| time.sleep(retry_delay) | |
| finally: | |
| if 'connection' in locals(): | |
| connection.close() | |
| templates = Jinja2Templates(directory=".") | |
| def get_groups(): | |
| sql = "SELECT Gruppe, idGruppe FROM `gruppe` WHERE aktuell is True ORDER BY idGruppe ASC;" | |
| return execute_query(sql) | |
| def get_students(idGruppe): | |
| if idGruppe == 'all': | |
| sql = """ | |
| SELECT DISTINCT concat(`ki`.`Vorname`,' ',`ki`.`Nachnamen`) AS `Kind`, `ki`.`idKind` AS `idKind` | |
| FROM `kind` `ki` | |
| JOIN `kind_x_gruppe_x_schuljahr` `kgs` ON `ki`.`idKind` = `kgs`.`x_kind` | |
| JOIN `schuljahr` `sch` ON `sch`.`idschuljahr` = `kgs`.`x_schuljahr` | |
| WHERE `sch`.`aktuell` = 1 | |
| ORDER BY 1 | |
| """ | |
| return execute_query(sql) | |
| else: | |
| sql = """ | |
| SELECT concat(`ki`.`Vorname`,' ',`ki`.`Nachnamen`) AS `Kind`, `ki`.`idKind` AS `idKind` | |
| FROM `kind` `ki` | |
| JOIN `kind_x_gruppe_x_schuljahr` `kgs` ON `ki`.`idKind` = `kgs`.`x_kind` | |
| JOIN `schuljahr` `sch` ON `sch`.`idschuljahr` = `kgs`.`x_schuljahr` | |
| JOIN `gruppe` `gr` ON `gr`.`idGruppe` = `kgs`.`x_gruppe` | |
| WHERE `sch`.`aktuell` = 1 AND (`gr`.`Gruppe` = %s OR `gr`.`idGruppe` = %s) | |
| ORDER BY 1 | |
| """ | |
| return execute_query(sql, (idGruppe, idGruppe)) | |
| def rueckgabe(idBuch, grund='rueckgabe'): | |
| """ | |
| Updates the database to mark a book as returned. | |
| Parameters: | |
| - idBuch (int): The ID of the book to be returned. | |
| - grund (str): The reason for the return (default: 'rueckgabe'). | |
| Returns: | |
| - dict: Information about the return, including the students who had the book. | |
| """ | |
| sql = f""" | |
| SELECT a.`idBuch`, a.`idKind`, a.`ausleihe`, a.`rueckgabe`, a.`rueckGrund`, | |
| CONCAT(k.`Vorname`, ' ', k.`Nachnamen`) AS student_name | |
| FROM `{AUSLEIHE_TABLE}` a | |
| JOIN `kind` k ON a.`idKind` = k.`idKind` | |
| WHERE a.`idBuch` = %s AND a.`rueckgabe` is NULL; | |
| """ | |
| result = execute_query(sql, (idBuch,)) | |
| if len(result) == 0: | |
| return {"success": False, "message": "Buch nicht gefunden oder bereits zurückgegeben"} | |
| # return the book | |
| sql = f"UPDATE `{AUSLEIHE_TABLE}` SET `rueckgabe` = NOW(), `rueckGrund` = %s WHERE `idBuch` = %s AND `rueckgabe` is NULL;" | |
| execute_query(sql, (grund, idBuch)) | |
| student_names = [row['student_name'] for row in result] | |
| students_str = ", ".join(student_names) | |
| return { | |
| "success": True, | |
| "message": f"Buch zurückgegeben von: {students_str}", | |
| "student_names": student_names | |
| } | |
| def ausleihe(idBuch, idKind): | |
| """ | |
| Performs a book loan operation by inserting a new record into 'ausleihe_test' or the table defined by the AUSLEIHE_TABLE environment variable. | |
| Parameters: | |
| - idBuch (int): The ID of the book being loaned. | |
| - idKind (int): The ID of the child borrowing the book. | |
| Returns: | |
| - dict: A dictionary containing the result of the borrowing operation. | |
| """ | |
| try: | |
| rueckgabe_result = rueckgabe(idBuch, grund="neu-ausleihe") | |
| message = "Buch erfolgreich ausgeliehen" | |
| if rueckgabe_result.get("success", False): | |
| # Get the name of the previous borrower | |
| prev_borrower_id = rueckgabe_result["student_names"][0] | |
| sql = "SELECT CONCAT(Vorname, ' ', Nachnamen) AS full_name FROM kind WHERE idKind = %s;" | |
| prev_borrower_name = execute_query(sql, (prev_borrower_id,))[0]['full_name'] | |
| message += f". Zuvor ausgeliehen von {prev_borrower_name}" | |
| # Insert new borrowing record | |
| sql = f"INSERT INTO `{AUSLEIHE_TABLE}` (`idBuch`, `idKind`, `ausleihe`) VALUES (%s, %s, NOW());" | |
| execute_query(sql, (idBuch, idKind)) | |
| return {"success": True, "message": message} | |
| except Exception as e: | |
| LOG.error(f"Error in ausleihe: {str(e)}") | |
| return {"success": False, "message": f"Fehler beim Ausleihen des Buches: {str(e)}"} | |
| def ausgeliehen(idKind): | |
| """ | |
| Retrieves the books that are currently borrowed by a specific child. | |
| Args: | |
| idKind (int): The ID of the child. | |
| Returns: | |
| list: A list of tuples containing the book ID and the borrowing date for each book that is currently borrowed by the child. | |
| """ | |
| sql = f"SELECT `idBuch`, `ausleihe` FROM `{AUSLEIHE_TABLE}` WHERE `idKind` = %s AND `rueckgabe` IS NULL;" | |
| result = execute_query(sql, (idKind,)) | |
| #pprint.pprint(result) | |
| return result | |
| async def read_root(request: Request): | |
| return templates.TemplateResponse("index.html", {"request": request, "page_title": PAGE_TITLE}) | |
| # run the app | |
| if __name__ == '__main__': | |
| import uvicorn | |
| uvicorn.run(app, host='localhost', port=5000) | |
| # %% | |