Spaces:
Runtime error
Runtime error
| from threading import Thread | |
| from multiprocessing import Queue | |
| from typing import Tuple, Dict, List | |
| from collections import defaultdict | |
| import logging | |
| import sys | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| handlers=[logging.StreamHandler(sys.stdout)], | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class TextFilterer(Thread): | |
| def __init__( | |
| self, | |
| text_queue: "Queue[Tuple[str, str]]", | |
| filtered_text_queue: "Queue[Tuple[str, str]]", | |
| ): | |
| super().__init__() | |
| self.text_queue = text_queue | |
| self.filtered_text_queue = filtered_text_queue | |
| self.daemon = True # Thread will exit when main program exits | |
| self.text_buffers: Dict[str, List[str]] = defaultdict(list) | |
| self.max_buffer_size = 5 | |
| def filter_text(self, text: str, session_id: str) -> str | None: | |
| self.text_buffers[session_id].append(text) | |
| if len(self.text_buffers[session_id]) < self.max_buffer_size: | |
| return None | |
| while len(self.text_buffers[session_id]) > self.max_buffer_size: | |
| _ = self.text_buffers[session_id].pop(0) | |
| candidate = self.text_buffers[session_id][-2] | |
| if candidate != "": | |
| print(f"Candidate: {candidate}") | |
| if ( | |
| len(self.text_buffers[session_id][-3]) | |
| < len(candidate) | |
| >= len(self.text_buffers[session_id][-1]) | |
| ): | |
| for past in self.text_buffers[session_id][:-2]: | |
| if candidate == past: | |
| return None | |
| return candidate | |
| return None | |
| def run(self) -> None: | |
| """Main processing loop.""" | |
| while True: | |
| try: | |
| # Get text from queue, blocks until text is available | |
| text, session_id = self.text_queue.get() | |
| # Process the text into an action | |
| filtered_text = self.filter_text(text, session_id) | |
| # If we got a valid action, add it to the action queue | |
| if filtered_text: | |
| self.filtered_text_queue.put((filtered_text, session_id)) | |
| except Exception as e: | |
| logger.error(f"Error processing text: {str(e)}") | |
| continue | |