2024-11-27 22:09:33 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								import  logging  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2024-11-21 02:01:58 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								from  typing  import  Any ,  Dict ,  Generator ,  List ,  Optional ,  Sequence ,  Union  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  urllib . parse  import  parse_qs ,  urlparse  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  langchain_core . documents  import  Document  
						 
					
						
							
								
									
										
										
										
											2024-11-27 22:09:33 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								from  open_webui . env  import  SRC_LOG_LEVELS  
						 
					
						
							
								
									
										
										
										
											2024-11-21 02:01:58 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2024-11-27 22:09:33 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								log  =  logging . getLogger ( __name__ )  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								log . setLevel ( SRC_LOG_LEVELS [ " RAG " ] )  
						 
					
						
							
								
									
										
										
										
											2024-11-21 02:01:58 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								ALLOWED_SCHEMES  =  { " http " ,  " https " }  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								ALLOWED_NETLOCS  =  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " youtu.be " , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " m.youtube.com " , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " youtube.com " , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " www.youtube.com " , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " www.youtube-nocookie.com " , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " vid.plus " , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  _parse_video_id ( url :  str )  - >  Optional [ str ] :  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ Parse a YouTube URL and return the video ID if valid, otherwise None. """ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    parsed_url  =  urlparse ( url ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  parsed_url . scheme  not  in  ALLOWED_SCHEMES : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        return  None 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  parsed_url . netloc  not  in  ALLOWED_NETLOCS : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        return  None 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    path  =  parsed_url . path 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  path . endswith ( " /watch " ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        query  =  parsed_url . query 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        parsed_query  =  parse_qs ( query ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  " v "  in  parsed_query : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            ids  =  parsed_query [ " v " ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            video_id  =  ids  if  isinstance ( ids ,  str )  else  ids [ 0 ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        else : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            return  None 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    else : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        path  =  parsed_url . path . lstrip ( " / " ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        video_id  =  path . split ( " / " ) [ - 1 ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  len ( video_id )  !=  11 :   # Video IDs are 11 characters long 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        return  None 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  video_id 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  YoutubeLoader :  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ Load `YouTube` video transcripts. """ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    def  __init__ ( 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        video_id :  str , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        language :  Union [ str ,  Sequence [ str ] ]  =  " en " , 
							 
						 
					
						
							
								
									
										
										
										
											2024-11-27 22:09:33 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								        proxy_url :  Optional [ str ]  =  None , 
							 
						 
					
						
							
								
									
										
										
										
											2024-11-21 02:01:58 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								    ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        """ Initialize with YouTube video ID. """ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        _video_id  =  _parse_video_id ( video_id ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . video_id  =  _video_id  if  _video_id  is  not  None  else  video_id 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . _metadata  =  { " source " :  video_id } 
							 
						 
					
						
							
								
									
										
										
										
											2024-11-27 22:09:33 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								        self . proxy_url  =  proxy_url 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 23:06:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								        
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 22:24:27 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								        # Ensure language is a list 
							 
						 
					
						
							
								
									
										
										
										
											2024-11-21 02:01:58 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								        if  isinstance ( language ,  str ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            self . language  =  [ language ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        else : 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 23:11:03 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								            self . language  =  list ( language ) 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 23:06:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								        
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # Add English as fallback if not already in the list 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  " en "  not  in  self . language : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            self . language . append ( " en " ) 
							 
						 
					
						
							
								
									
										
										
										
											2024-11-21 02:01:58 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    def  load ( self )  - >  List [ Document ] : 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 01:57:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								        """ Load YouTube transcripts into `Document` objects. """ 
							 
						 
					
						
							
								
									
										
										
										
											2024-11-21 02:01:58 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								        try : 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 01:57:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								            from  youtube_transcript_api  import  ( 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                NoTranscriptFound , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                TranscriptsDisabled , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                YouTubeTranscriptApi , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        except  ImportError : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            raise  ImportError ( 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                ' Could not import  " youtube_transcript_api "  Python package.  ' 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                " Please install it with `pip install youtube-transcript-api`. " 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            ) 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 23:06:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								    
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 01:57:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								        if  self . proxy_url : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            youtube_proxies  =  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                " http " :  self . proxy_url , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                " https " :  self . proxy_url , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            } 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 02:08:25 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								            # Don't log complete URL because it might contain secrets 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            log . debug ( f " Using proxy URL:  { self . proxy_url [ : 14 ] } ... " ) 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 01:57:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								        else : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            youtube_proxies  =  None 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 22:30:18 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								        
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 01:57:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								        try : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            transcript_list  =  YouTubeTranscriptApi . list_transcripts ( 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                self . video_id ,  proxies = youtube_proxies 
							 
						 
					
						
							
								
									
										
										
										
											2024-11-21 02:01:58 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								            ) 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 01:57:06 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								        except  Exception  as  e : 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 01:57:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								            log . exception ( " Loading YouTube transcript failed " ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            return  [ ] 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 22:30:18 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								        
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 01:57:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								        # Try each language in order of priority 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 23:06:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								        for  lang  in  self . language : 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 01:57:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								            try : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                transcript  =  transcript_list . find_transcript ( [ lang ] ) 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 02:00:10 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								                log . debug ( f " Found transcript for language  ' { lang } ' " ) 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 01:57:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								                transcript_pieces :  List [ Dict [ str ,  Any ] ]  =  transcript . fetch ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                transcript_text  =  "   " . join ( 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                    map ( 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                        lambda  transcript_piece :  transcript_piece . text . strip ( "   " ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                        transcript_pieces , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                    ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                return  [ Document ( page_content = transcript_text ,  metadata = self . _metadata ) ] 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 02:03:00 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								            except  NoTranscriptFound : 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 01:57:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								                log . debug ( f " No transcript found for language  ' { lang } ' " ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                continue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            except  Exception  as  e : 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 02:40:48 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								                log . info ( f " Error finding transcript for language  ' { lang } ' " ) 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 01:57:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								                raise  e 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 23:06:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								    
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 22:22:40 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								        # If we get here, all languages failed 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 23:06:21 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								        languages_tried  =  " ,  " . join ( self . language ) 
							 
						 
					
						
							
								
									
										
										
										
											2025-05-06 23:14:00 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								        log . warning ( f " No transcript found for any of the specified languages:  { languages_tried } . Verify if the video has transcripts, add more languages if needed. " ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        raise  NoTranscriptFound ( f " No transcript found for any supported language. Verify if the video has transcripts, add more languages if needed. " )