| 
									
										
										
										
											2024-11-27 22:09:33 +08:00
										 |  |  | import logging | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-21 02:01:58 +08:00
										 |  |  | from typing import Any, Dict, Generator, List, Optional, Sequence, Union | 
					
						
							|  |  |  | from urllib.parse import parse_qs, urlparse | 
					
						
							|  |  |  | from langchain_core.documents import Document | 
					
						
							| 
									
										
										
										
											2024-11-27 22:09:33 +08:00
										 |  |  | from open_webui.env import SRC_LOG_LEVELS | 
					
						
							| 
									
										
										
										
											2024-11-21 02:01:58 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-27 22:09:33 +08:00
										 |  |  | log = logging.getLogger(__name__) | 
					
						
							|  |  |  | log.setLevel(SRC_LOG_LEVELS["RAG"]) | 
					
						
							| 
									
										
										
										
											2024-11-21 02:01:58 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | ALLOWED_SCHEMES = {"http", "https"} | 
					
						
							|  |  |  | ALLOWED_NETLOCS = { | 
					
						
							|  |  |  |     "youtu.be", | 
					
						
							|  |  |  |     "m.youtube.com", | 
					
						
							|  |  |  |     "youtube.com", | 
					
						
							|  |  |  |     "www.youtube.com", | 
					
						
							|  |  |  |     "www.youtube-nocookie.com", | 
					
						
							|  |  |  |     "vid.plus", | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def _parse_video_id(url: str) -> Optional[str]: | 
					
						
							|  |  |  |     """Parse a YouTube URL and return the video ID if valid, otherwise None.""" | 
					
						
							|  |  |  |     parsed_url = urlparse(url) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if parsed_url.scheme not in ALLOWED_SCHEMES: | 
					
						
							|  |  |  |         return None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if parsed_url.netloc not in ALLOWED_NETLOCS: | 
					
						
							|  |  |  |         return None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     path = parsed_url.path | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if path.endswith("/watch"): | 
					
						
							|  |  |  |         query = parsed_url.query | 
					
						
							|  |  |  |         parsed_query = parse_qs(query) | 
					
						
							|  |  |  |         if "v" in parsed_query: | 
					
						
							|  |  |  |             ids = parsed_query["v"] | 
					
						
							|  |  |  |             video_id = ids if isinstance(ids, str) else ids[0] | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             return None | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         path = parsed_url.path.lstrip("/") | 
					
						
							|  |  |  |         video_id = path.split("/")[-1] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if len(video_id) != 11:  # Video IDs are 11 characters long | 
					
						
							|  |  |  |         return None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return video_id | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class YoutubeLoader: | 
					
						
							|  |  |  |     """Load `YouTube` video transcripts.""" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__( | 
					
						
							|  |  |  |         self, | 
					
						
							|  |  |  |         video_id: str, | 
					
						
							|  |  |  |         language: Union[str, Sequence[str]] = "en", | 
					
						
							| 
									
										
										
										
											2024-11-27 22:09:33 +08:00
										 |  |  |         proxy_url: Optional[str] = None, | 
					
						
							| 
									
										
										
										
											2024-11-21 02:01:58 +08:00
										 |  |  |     ): | 
					
						
							|  |  |  |         """Initialize with YouTube video ID.""" | 
					
						
							|  |  |  |         _video_id = _parse_video_id(video_id) | 
					
						
							|  |  |  |         self.video_id = _video_id if _video_id is not None else video_id | 
					
						
							|  |  |  |         self._metadata = {"source": video_id} | 
					
						
							| 
									
										
										
										
											2024-11-27 22:09:33 +08:00
										 |  |  |         self.proxy_url = proxy_url | 
					
						
							| 
									
										
										
										
											2025-05-06 23:06:21 +08:00
										 |  |  |          | 
					
						
							| 
									
										
										
										
											2025-05-06 22:24:27 +08:00
										 |  |  |         # Ensure language is a list | 
					
						
							| 
									
										
										
										
											2024-11-21 02:01:58 +08:00
										 |  |  |         if isinstance(language, str): | 
					
						
							|  |  |  |             self.language = [language] | 
					
						
							|  |  |  |         else: | 
					
						
							| 
									
										
										
										
											2025-05-06 23:06:21 +08:00
										 |  |  |             self.language = list(language)  # Make a copy to avoid modifying the original | 
					
						
							|  |  |  |          | 
					
						
							|  |  |  |         # Add English as fallback if not already in the list | 
					
						
							|  |  |  |         if "en" not in self.language: | 
					
						
							|  |  |  |             self.language.append("en") | 
					
						
							| 
									
										
										
										
											2024-11-21 02:01:58 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def load(self) -> List[Document]: | 
					
						
							| 
									
										
										
										
											2025-05-06 01:57:21 +08:00
										 |  |  |         """Load YouTube transcripts into `Document` objects.""" | 
					
						
							| 
									
										
										
										
											2024-11-21 02:01:58 +08:00
										 |  |  |         try: | 
					
						
							| 
									
										
										
										
											2025-05-06 01:57:21 +08:00
										 |  |  |             from youtube_transcript_api import ( | 
					
						
							|  |  |  |                 NoTranscriptFound, | 
					
						
							|  |  |  |                 TranscriptsDisabled, | 
					
						
							|  |  |  |                 YouTubeTranscriptApi, | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |         except ImportError: | 
					
						
							|  |  |  |             raise ImportError( | 
					
						
							|  |  |  |                 'Could not import "youtube_transcript_api" Python package. ' | 
					
						
							|  |  |  |                 "Please install it with `pip install youtube-transcript-api`." | 
					
						
							|  |  |  |             ) | 
					
						
							| 
									
										
										
										
											2025-05-06 23:06:21 +08:00
										 |  |  |      | 
					
						
							| 
									
										
										
										
											2025-05-06 01:57:21 +08:00
										 |  |  |         if self.proxy_url: | 
					
						
							|  |  |  |             youtube_proxies = { | 
					
						
							|  |  |  |                 "http": self.proxy_url, | 
					
						
							|  |  |  |                 "https": self.proxy_url, | 
					
						
							|  |  |  |             } | 
					
						
							| 
									
										
										
										
											2025-05-06 02:08:25 +08:00
										 |  |  |             # Don't log complete URL because it might contain secrets | 
					
						
							|  |  |  |             log.debug(f"Using proxy URL: {self.proxy_url[:14]}...") | 
					
						
							| 
									
										
										
										
											2025-05-06 01:57:21 +08:00
										 |  |  |         else: | 
					
						
							|  |  |  |             youtube_proxies = None | 
					
						
							| 
									
										
										
										
											2025-05-06 22:30:18 +08:00
										 |  |  |          | 
					
						
							| 
									
										
										
										
											2025-05-06 01:57:21 +08:00
										 |  |  |         try: | 
					
						
							|  |  |  |             transcript_list = YouTubeTranscriptApi.list_transcripts( | 
					
						
							|  |  |  |                 self.video_id, proxies=youtube_proxies | 
					
						
							| 
									
										
										
										
											2024-11-21 02:01:58 +08:00
										 |  |  |             ) | 
					
						
							| 
									
										
										
										
											2025-05-06 01:57:06 +08:00
										 |  |  |         except Exception as e: | 
					
						
							| 
									
										
										
										
											2025-05-06 01:57:21 +08:00
										 |  |  |             log.exception("Loading YouTube transcript failed") | 
					
						
							|  |  |  |             return [] | 
					
						
							| 
									
										
										
										
											2025-05-06 22:30:18 +08:00
										 |  |  |          | 
					
						
							| 
									
										
										
										
											2025-05-06 01:57:21 +08:00
										 |  |  |         # Try each language in order of priority | 
					
						
							| 
									
										
										
										
											2025-05-06 23:06:21 +08:00
										 |  |  |         for lang in self.language: | 
					
						
							| 
									
										
										
										
											2025-05-06 01:57:21 +08:00
										 |  |  |             try: | 
					
						
							|  |  |  |                 transcript = transcript_list.find_transcript([lang]) | 
					
						
							| 
									
										
										
										
											2025-05-06 02:00:10 +08:00
										 |  |  |                 log.debug(f"Found transcript for language '{lang}'") | 
					
						
							| 
									
										
										
										
											2025-05-06 01:57:21 +08:00
										 |  |  |                 transcript_pieces: List[Dict[str, Any]] = transcript.fetch() | 
					
						
							|  |  |  |                 transcript_text = " ".join( | 
					
						
							|  |  |  |                     map( | 
					
						
							|  |  |  |                         lambda transcript_piece: transcript_piece.text.strip(" "), | 
					
						
							|  |  |  |                         transcript_pieces, | 
					
						
							|  |  |  |                     ) | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |                 return [Document(page_content=transcript_text, metadata=self._metadata)] | 
					
						
							| 
									
										
										
										
											2025-05-06 02:03:00 +08:00
										 |  |  |             except NoTranscriptFound: | 
					
						
							| 
									
										
										
										
											2025-05-06 01:57:21 +08:00
										 |  |  |                 log.debug(f"No transcript found for language '{lang}'") | 
					
						
							|  |  |  |                 continue | 
					
						
							|  |  |  |             except Exception as e: | 
					
						
							| 
									
										
										
										
											2025-05-06 02:40:48 +08:00
										 |  |  |                 log.info(f"Error finding transcript for language '{lang}'") | 
					
						
							| 
									
										
										
										
											2025-05-06 01:57:21 +08:00
										 |  |  |                 raise e | 
					
						
							| 
									
										
										
										
											2025-05-06 23:06:21 +08:00
										 |  |  |      | 
					
						
							| 
									
										
										
										
											2025-05-06 22:22:40 +08:00
										 |  |  |         # If we get here, all languages failed | 
					
						
							| 
									
										
										
										
											2025-05-06 23:06:21 +08:00
										 |  |  |         languages_tried = ", ".join(self.language) | 
					
						
							|  |  |  |         log.warning(f"No transcript found for any of the specified languages: {languages_tried}") | 
					
						
							|  |  |  |         raise NoTranscriptFound(f"No transcript found for any supported language") |