mirror of https://github.com/jerome158/LIVETV.git
feat: support whitelist keywords and set subscribe whitelist in subscribe.txt
This commit is contained in:
parent
4225f79839
commit
f277ce0dac
|
|
@ -6,4 +6,10 @@ https://raw.githubusercontent.com/iptv-org/iptv/gh-pages/countries/cn.m3u
|
|||
https://raw.githubusercontent.com/iptv-org/iptv/master/streams/cn.m3u
|
||||
https://raw.githubusercontent.com/suxuang/myIPTV/main/ipv4.m3u
|
||||
https://raw.githubusercontent.com/kimwang1978/collect-tv-txt/main/others_output.txt
|
||||
https://raw.githubusercontent.com/vbskycn/iptv/master/tv/iptv4.txt
|
||||
https://raw.githubusercontent.com/vbskycn/iptv/master/tv/iptv4.txt
|
||||
|
||||
|
||||
|
||||
[WHITELIST]
|
||||
# 以下是订阅源的白名单,白名单内的订阅源获取的接口将不会参与测速,始终保留至结果最前。
|
||||
# This is the whitelist for subscription sources. Subscription sources in the whitelist will not participate in speed testing and will always be retained at the front of the results.
|
||||
|
|
|
|||
|
|
@ -1,4 +1,12 @@
|
|||
# 这是接口或订阅源的白名单,白名单内的接口或订阅源获取的接口将不会参与测速,优先排序至结果最前。
|
||||
# 这是接口的白名单,白名单内的接口将不会参与测速,始终保留至结果最前。
|
||||
# 填写频道名称会直接保留该记录至最终结果,如:CCTV-1,接口地址,只填写接口地址则对所有频道生效,多条记录换行输入。
|
||||
# This is the whitelist of the interface or subscription source. The interface in the whitelist or the interface obtained by the subscription source will not participate in the speed measurement and will be prioritized in the result.
|
||||
# This is the whitelist for the interface. Interfaces in the whitelist will not participate in speed testing and will always be retained at the front of the results.
|
||||
# Filling in the channel name will directly retain the record to the final result, such as: CCTV-1,url, only fill in the interface address will be effective for all channels, multiple records newline input.
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
[KEYWORDS]
|
||||
# 以下是关键字白名单,含有关键字的地址会被加入白名单,换行输入。
|
||||
# This is the keyword whitelist. Addresses containing keywords will be added to the whitelist, newline input.
|
||||
|
|
|
|||
|
|
@ -15,7 +15,9 @@
|
|||
"msg.update_completed": "\uD83E\uDD73 Update completed! Total time spent: {time}{service_tip}",
|
||||
"msg.service_tip": ", You can watch it at the following address",
|
||||
"msg.service_run_success": "✅ Service run successful{service_tip}",
|
||||
"msg.whitelist_found": "✅ Whitelist found: {count}",
|
||||
"msg.whitelist_found": "✅ Whitelist interface rules count: {count}",
|
||||
"msg.blacklist_found": "✅ Blacklist interface rules count: {count}",
|
||||
"msg.subscribe_urls_whitelist_total": "✅ Default subscribe urls count: {default_count}, whitelist subscribe urls count: {whitelist_count}, total count: {total}",
|
||||
"msg.error_name_info": "❌ Error on {name}: {info}",
|
||||
"msg.error_load_cache": "❌ Error loading cache file: {info}",
|
||||
"msg.error_append_channel_data": "❌ Error appending channel data: {info}",
|
||||
|
|
|
|||
|
|
@ -15,7 +15,9 @@
|
|||
"msg.update_completed": "\uD83E\uDD73 更新完成!总耗时:{time}{service_tip}",
|
||||
"msg.service_tip": ",可使用以下地址进行观看",
|
||||
"msg.service_run_success": "✅ 服务启动成功{service_tip}",
|
||||
"msg.whitelist_found": "✅ 白名单数量:{count}",
|
||||
"msg.whitelist_found": "✅ 白名单接口规则数量:{count}",
|
||||
"msg.blacklist_found": "✅ 黑名单接口规则数量:{count}",
|
||||
"msg.subscribe_urls_whitelist_total": "✅ 默认订阅源数量:{default_count},白名单订阅源数量:{whitelist_count},总数量:{total}",
|
||||
"msg.error_name_info": "❌ {name} 出错:{info}",
|
||||
"msg.error_load_cache": "❌ 加载缓存文件出错:{info}",
|
||||
"msg.error_append_channel_data": "❌ 添加频道数据出错:{info}",
|
||||
|
|
|
|||
23
main.py
23
main.py
|
|
@ -32,18 +32,20 @@ from utils.tools import (
|
|||
check_ipv6_support,
|
||||
get_urls_from_file,
|
||||
get_version_info,
|
||||
join_url,
|
||||
get_urls_len,
|
||||
merge_objects,
|
||||
get_public_url,
|
||||
parse_times
|
||||
)
|
||||
from utils.types import CategoryChannelData
|
||||
from utils.whitelist import load_whitelist_maps, get_section_entries
|
||||
|
||||
|
||||
class UpdateSource:
|
||||
|
||||
def __init__(self):
|
||||
self.whitelist_maps = None
|
||||
self.blacklist = None
|
||||
self.update_progress = None
|
||||
self.run_ui = False
|
||||
self.tasks = []
|
||||
|
|
@ -83,15 +85,16 @@ class UpdateSource:
|
|||
continue
|
||||
if config.open_method[setting]:
|
||||
if setting == "subscribe":
|
||||
subscribe_urls = get_urls_from_file(constants.subscribe_path)
|
||||
whitelist_urls = get_urls_from_file(constants.whitelist_path)
|
||||
if not os.getenv("GITHUB_ACTIONS") and config.cdn_url:
|
||||
subscribe_urls = [join_url(config.cdn_url, url) if "raw.githubusercontent.com" in url else url
|
||||
for url in subscribe_urls]
|
||||
whitelist_subscribe_urls, default_subscribe_urls = get_section_entries(constants.subscribe_path,
|
||||
pattern=constants.url_pattern)
|
||||
subscribe_urls = list(dict.fromkeys(whitelist_subscribe_urls + default_subscribe_urls))
|
||||
print(t("msg.subscribe_urls_whitelist_total").format(default_count=len(default_subscribe_urls),
|
||||
whitelist_count=len(whitelist_subscribe_urls),
|
||||
total=len(subscribe_urls)))
|
||||
task = asyncio.create_task(
|
||||
task_func(subscribe_urls,
|
||||
names=channel_names,
|
||||
whitelist=whitelist_urls,
|
||||
whitelist=whitelist_subscribe_urls,
|
||||
callback=self.update_progress
|
||||
)
|
||||
)
|
||||
|
|
@ -119,7 +122,9 @@ class UpdateSource:
|
|||
try:
|
||||
main_start_time = time()
|
||||
if config.open_update:
|
||||
self.channel_items = get_channel_items()
|
||||
self.whitelist_maps = load_whitelist_maps(constants.whitelist_path)
|
||||
self.blacklist = get_urls_from_file(constants.blacklist_path, pattern_search=False)
|
||||
self.channel_items = get_channel_items(self.whitelist_maps, self.blacklist)
|
||||
self.channel_data = {}
|
||||
channel_names = [
|
||||
name
|
||||
|
|
@ -139,6 +144,8 @@ class UpdateSource:
|
|||
self.hotel_foodie_result,
|
||||
self.subscribe_result,
|
||||
self.online_search_result,
|
||||
self.whitelist_maps,
|
||||
self.blacklist
|
||||
)
|
||||
cache_result = self.channel_data
|
||||
test_result = {}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ from utils.driver.tools import get_soup_driver
|
|||
from utils.config import config
|
||||
import utils.constants as constants
|
||||
from utils.channel import format_channel_name
|
||||
from utils.tools import get_pbar_remaining, resource_path, get_name_url
|
||||
from utils.tools import get_pbar_remaining, resource_path, get_name_value
|
||||
import json
|
||||
|
||||
# import asyncio
|
||||
|
|
@ -130,12 +130,12 @@ def get_multicast_region_result_by_rtp_txt(callback=None):
|
|||
os.path.join(rtp_path, f"{filename}.txt"), "r", encoding="utf-8"
|
||||
) as f:
|
||||
for line in f:
|
||||
name_url = get_name_url(line, pattern=constants.rtp_pattern)
|
||||
if name_url and name_url[0]:
|
||||
channel_name = format_channel_name(name_url[0]["name"])
|
||||
url = name_url[0]["url"]
|
||||
if url not in multicast_result[channel_name][region][type]:
|
||||
multicast_result[channel_name][region][type].append(url)
|
||||
name_value = get_name_value(line, pattern=constants.rtp_pattern)
|
||||
if name_value and name_value[0]:
|
||||
channel_name = format_channel_name(name_value[0]["name"])
|
||||
value = name_value[0]["value"]
|
||||
if value not in multicast_result[channel_name][region][type]:
|
||||
multicast_result[channel_name][region][type].append(value)
|
||||
pbar.update()
|
||||
if callback:
|
||||
remaining_files = total_files - pbar.n
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import os
|
||||
from collections import defaultdict
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from logging import INFO
|
||||
|
|
@ -14,8 +15,8 @@ from utils.retry import retry_func
|
|||
from utils.tools import (
|
||||
merge_objects,
|
||||
get_pbar_remaining,
|
||||
get_name_url,
|
||||
get_logger
|
||||
get_name_value,
|
||||
get_logger, join_url
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -33,8 +34,15 @@ async def get_channels_by_subscribe_urls(
|
|||
"""
|
||||
Get the channels by subscribe urls
|
||||
"""
|
||||
if not os.getenv("GITHUB_ACTIONS") and config.cdn_url:
|
||||
def _map_raw(u):
|
||||
return join_url(config.cdn_url, u) if "raw.githubusercontent.com" in u else u
|
||||
|
||||
urls = [_map_raw(u) for u in urls]
|
||||
whitelist = [_map_raw(u) for u in whitelist] if whitelist else None
|
||||
if whitelist:
|
||||
urls.sort(key=lambda url: whitelist.index(url) if url in whitelist else len(whitelist))
|
||||
index_map = {u: i for i, u in enumerate(whitelist)}
|
||||
urls.sort(key=lambda u: index_map.get(u, len(whitelist)))
|
||||
subscribe_results = {}
|
||||
subscribe_urls_len = len(urls)
|
||||
pbar = tqdm_asyncio(
|
||||
|
|
@ -81,7 +89,7 @@ async def get_channels_by_subscribe_urls(
|
|||
response.encoding = "utf-8"
|
||||
content = response.text
|
||||
m3u_type = True if "#EXTM3U" in content else False
|
||||
data = get_name_url(
|
||||
data = get_name_value(
|
||||
content,
|
||||
pattern=(
|
||||
constants.multiline_m3u_pattern
|
||||
|
|
@ -91,12 +99,12 @@ async def get_channels_by_subscribe_urls(
|
|||
open_headers=config.open_headers if m3u_type else False
|
||||
)
|
||||
for item in data:
|
||||
name = item["name"]
|
||||
url = item["url"]
|
||||
if name and url:
|
||||
name = format_channel_name(name)
|
||||
data_name = item.get("name", "").strip()
|
||||
url = item.get("value", "").strip()
|
||||
if data_name and url:
|
||||
name = format_channel_name(data_name)
|
||||
if names and name not in names:
|
||||
logger.info(f"{item["name"]},{item["url"]}")
|
||||
logger.info(f"{data_name},{url}")
|
||||
continue
|
||||
url_partition = url.partition("$")
|
||||
url = url_partition[0]
|
||||
|
|
|
|||
|
|
@ -27,12 +27,11 @@ from utils.speed import (
|
|||
)
|
||||
from utils.tools import (
|
||||
format_name,
|
||||
get_name_url,
|
||||
get_name_value,
|
||||
check_url_by_keywords,
|
||||
get_total_urls,
|
||||
add_url_info,
|
||||
resource_path,
|
||||
get_urls_from_file,
|
||||
get_name_urls_from_file,
|
||||
get_logger,
|
||||
get_datetime_now,
|
||||
|
|
@ -41,9 +40,11 @@ from utils.tools import (
|
|||
convert_to_m3u,
|
||||
custom_print,
|
||||
get_name_uri_from_dir,
|
||||
get_resolution_value, get_public_url
|
||||
get_resolution_value,
|
||||
get_public_url
|
||||
)
|
||||
from utils.types import ChannelData, OriginType, CategoryChannelData, TestResult
|
||||
from utils.types import ChannelData, OriginType, CategoryChannelData, TestResult, WhitelistMaps
|
||||
from utils.whitelist import is_url_whitelisted, get_whitelist_url, get_whitelist_total_count
|
||||
|
||||
channel_alias = Alias()
|
||||
ip_checker = IPChecker()
|
||||
|
|
@ -91,7 +92,7 @@ def check_channel_need_frozen(info: TestResult) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
def get_channel_data_from_file(channels, file, whitelist, blacklist,
|
||||
def get_channel_data_from_file(channels, file, whitelist_maps, blacklist,
|
||||
local_data=None, hls_data=None) -> CategoryChannelData:
|
||||
"""
|
||||
Get the channel data from the file
|
||||
|
|
@ -103,18 +104,17 @@ def get_channel_data_from_file(channels, file, whitelist, blacklist,
|
|||
if "#genre#" in line:
|
||||
current_category = line.partition(",")[0]
|
||||
else:
|
||||
name_url = get_name_url(
|
||||
line, pattern=constants.demo_txt_pattern, check_url=False
|
||||
name_value = get_name_value(
|
||||
line, pattern=constants.demo_txt_pattern, check_value=False
|
||||
)
|
||||
if name_url and name_url[0]:
|
||||
name = name_url[0]["name"]
|
||||
url = name_url[0]["url"]
|
||||
if name_value and name_value[0]:
|
||||
name = name_value[0]["name"]
|
||||
url = name_value[0]["value"]
|
||||
category_dict = channels[current_category]
|
||||
if name not in category_dict:
|
||||
category_dict[name] = []
|
||||
if name in whitelist:
|
||||
for whitelist_url in whitelist[name]:
|
||||
category_dict[name].append(format_channel_data(whitelist_url, "whitelist"))
|
||||
for whitelist_url in get_whitelist_url(whitelist_maps, name):
|
||||
category_dict[name].append(format_channel_data(whitelist_url, "whitelist"))
|
||||
if hls_data and name in hls_data:
|
||||
for hls_url in hls_data[name]:
|
||||
category_dict[name].append(format_channel_data(hls_url, "hls"))
|
||||
|
|
@ -137,13 +137,15 @@ def get_channel_data_from_file(channels, file, whitelist, blacklist,
|
|||
category_dict[name].append(format_channel_data(local_url, "local"))
|
||||
except re.error:
|
||||
pass
|
||||
if open_local and url:
|
||||
if not check_url_by_keywords(url, blacklist):
|
||||
if url:
|
||||
if is_url_whitelisted(whitelist_maps, url, name):
|
||||
category_dict[name].append(format_channel_data(url, "whitelist"))
|
||||
elif open_local and not check_url_by_keywords(url, blacklist):
|
||||
category_dict[name].append(format_channel_data(url, "local"))
|
||||
return channels
|
||||
|
||||
|
||||
def get_channel_items() -> CategoryChannelData:
|
||||
def get_channel_items(whitelist_maps, blacklist) -> CategoryChannelData:
|
||||
"""
|
||||
Get the channel items from the source file
|
||||
"""
|
||||
|
|
@ -153,16 +155,17 @@ def get_channel_items() -> CategoryChannelData:
|
|||
if config.open_rtmp:
|
||||
hls_data = get_name_uri_from_dir(constants.hls_path)
|
||||
local_data = get_name_urls_from_file(config.local_file)
|
||||
whitelist = get_name_urls_from_file(constants.whitelist_path)
|
||||
blacklist = get_urls_from_file(constants.blacklist_path, pattern_search=False)
|
||||
whitelist_len = len(list(whitelist.keys()))
|
||||
if whitelist_len:
|
||||
print(t("msg.whitelist_found").format(count=whitelist_len))
|
||||
whitelist_count = get_whitelist_total_count(whitelist_maps)
|
||||
blacklist_count = len(blacklist)
|
||||
if whitelist_count:
|
||||
print(t("msg.whitelist_found").format(count=whitelist_count))
|
||||
if blacklist_count:
|
||||
print(t("msg.blacklist_found").format(count=blacklist_count))
|
||||
|
||||
if os.path.exists(user_source_file):
|
||||
with open(user_source_file, "r", encoding="utf-8") as file:
|
||||
channels = get_channel_data_from_file(
|
||||
channels, file, whitelist, blacklist, local_data, hls_data
|
||||
channels, file, whitelist_maps, blacklist, local_data, hls_data
|
||||
)
|
||||
|
||||
if config.open_history:
|
||||
|
|
@ -531,7 +534,7 @@ def append_data_to_info_data(
|
|||
name: str,
|
||||
data: list,
|
||||
origin: str = None,
|
||||
whitelist: list = None,
|
||||
whitelist_maps: WhitelistMaps = None,
|
||||
blacklist: list = None,
|
||||
ipv_type_data: dict = None
|
||||
) -> None:
|
||||
|
|
@ -544,7 +547,7 @@ def append_data_to_info_data(
|
|||
name: Name key within the category
|
||||
data: List of channel items to process
|
||||
origin: Default origin for items
|
||||
whitelist: List of whitelist keywords
|
||||
whitelist_maps: Maps of whitelist keywords
|
||||
blacklist: List of blacklist keywords
|
||||
ipv_type_data: Dictionary to cache IP type information
|
||||
"""
|
||||
|
|
@ -573,14 +576,15 @@ def append_data_to_info_data(
|
|||
if not url or url in existing_urls:
|
||||
continue
|
||||
|
||||
if url_origin != "whitelist" and whitelist and check_url_by_keywords(url, whitelist):
|
||||
if url_origin != "whitelist" and whitelist_maps and is_url_whitelisted(whitelist_maps, url, name):
|
||||
url_origin = "whitelist"
|
||||
|
||||
if not url_origin:
|
||||
continue
|
||||
|
||||
if url_origin not in retain_origin:
|
||||
if url in frozen_channels or blacklist and check_url_by_keywords(url, blacklist):
|
||||
url = get_channel_url(url)
|
||||
if not url or url in frozen_channels or blacklist and check_url_by_keywords(url, blacklist):
|
||||
continue
|
||||
|
||||
if not ipv_type:
|
||||
|
|
@ -634,7 +638,7 @@ def get_origin_method_name(method):
|
|||
return "hotel" if method.startswith("hotel_") else method
|
||||
|
||||
|
||||
def append_old_data_to_info_data(info_data, cate, name, data, whitelist=None, blacklist=None, ipv_type_data=None):
|
||||
def append_old_data_to_info_data(info_data, cate, name, data, whitelist_maps=None, blacklist=None, ipv_type_data=None):
|
||||
"""
|
||||
Append old existed channel data to total info data
|
||||
"""
|
||||
|
|
@ -644,7 +648,7 @@ def append_old_data_to_info_data(info_data, cate, name, data, whitelist=None, bl
|
|||
append_data_to_info_data(
|
||||
info_data, cate, name, items,
|
||||
origin=origin if origin else None,
|
||||
whitelist=whitelist,
|
||||
whitelist_maps=whitelist_maps,
|
||||
blacklist=blacklist,
|
||||
ipv_type_data=ipv_type_data
|
||||
)
|
||||
|
|
@ -689,6 +693,8 @@ def append_total_data(
|
|||
hotel_foodie_result=None,
|
||||
subscribe_result=None,
|
||||
online_search_result=None,
|
||||
whitelist_maps=None,
|
||||
blacklist=None,
|
||||
):
|
||||
"""
|
||||
Append all method data to total info data
|
||||
|
|
@ -700,8 +706,6 @@ def append_total_data(
|
|||
("subscribe", subscribe_result),
|
||||
("online_search", online_search_result),
|
||||
]
|
||||
whitelist = get_urls_from_file(constants.whitelist_path)
|
||||
blacklist = get_urls_from_file(constants.blacklist_path, pattern_search=False)
|
||||
url_hosts_ipv_type = {}
|
||||
for obj in data.values():
|
||||
for value_list in obj.values():
|
||||
|
|
@ -712,7 +716,8 @@ def append_total_data(
|
|||
for name, old_info_list in channel_obj.items():
|
||||
print(f"{name}:", end=" ")
|
||||
if old_info_list:
|
||||
append_old_data_to_info_data(data, cate, name, old_info_list, whitelist=whitelist, blacklist=blacklist,
|
||||
append_old_data_to_info_data(data, cate, name, old_info_list, whitelist_maps=whitelist_maps,
|
||||
blacklist=blacklist,
|
||||
ipv_type_data=url_hosts_ipv_type)
|
||||
for method, result in total_result:
|
||||
if config.open_method[method]:
|
||||
|
|
@ -721,7 +726,8 @@ def append_total_data(
|
|||
continue
|
||||
name_results = get_channel_results_by_name(name, result)
|
||||
append_data_to_info_data(
|
||||
data, cate, name, name_results, origin=origin_method, whitelist=whitelist, blacklist=blacklist,
|
||||
data, cate, name, name_results, origin=origin_method, whitelist_maps=whitelist_maps,
|
||||
blacklist=blacklist,
|
||||
ipv_type_data=url_hosts_ipv_type
|
||||
)
|
||||
print(f"{t(f"name.{method}")}:", len(name_results), end=", ")
|
||||
|
|
|
|||
|
|
@ -54,20 +54,18 @@ url_pattern = re.compile(
|
|||
|
||||
rt_url_pattern = re.compile(r"^(rtmp|rtsp)://.*$")
|
||||
|
||||
rtp_pattern = re.compile(r"^(?P<name>[^,,]+)[,,]?(?P<url>rtp://.*)$")
|
||||
rtp_pattern = re.compile(r"^(?P<name>[^,,]+)[,,]?(?P<value>rtp://.*)$")
|
||||
|
||||
demo_txt_pattern = re.compile(r"^(?P<name>[^,,]+)[,,]?(?!#genre#)" + r"(" + url_pattern.pattern + r")?")
|
||||
demo_txt_pattern = re.compile(r"^(?P<name>[^,,]+)[,,]?(?!#genre#)(?P<value>.+)?$")
|
||||
|
||||
txt_pattern = re.compile(r"^(?P<name>[^,,]+)[,,](?!#genre#)" + r"(" + url_pattern.pattern + r")")
|
||||
txt_pattern = re.compile(r"^(?P<name>[^,,]+)[,,](?!#genre#)(?P<value>.+)$")
|
||||
|
||||
multiline_txt_pattern = re.compile(r"^(?P<name>[^,,]+)[,,](?!#genre#)" + r"(" + url_pattern.pattern + r")",
|
||||
re.MULTILINE)
|
||||
multiline_txt_pattern = re.compile(r"^(?P<name>[^,,]+)[,,](?!#genre#)(?P<value>.+)$", re.MULTILINE)
|
||||
|
||||
m3u_pattern = re.compile(
|
||||
r"^#EXTINF:-1[\s+,,](?P<attributes>[^,,]+)[,,](?P<name>.*?)\n" + r"(" + url_pattern.pattern + r")")
|
||||
m3u_pattern = re.compile(r"^#EXTINF:-1[\s+,,](?P<attributes>[^,,]+)[,,](?P<name>.*?)\n(?P<value>.+)$")
|
||||
|
||||
multiline_m3u_pattern = re.compile(
|
||||
r"^#EXTINF:-1[\s+,,](?P<attributes>[^,,]+)[,,](?P<name>.*?)\n(?P<options>(#EXTVLCOPT:.*\n)*?)" + r"(" + url_pattern.pattern + r")",
|
||||
r"^#EXTINF:-1[\s+,,](?P<attributes>[^,,]+)[,,](?P<name>.*?)\n(?P<options>(#EXTVLCOPT:.*\n)*?)(?P<value>.+)$",
|
||||
re.MULTILINE)
|
||||
|
||||
key_value_pattern = re.compile(r'(?P<key>\w+)=(?P<value>\S+)')
|
||||
|
|
|
|||
|
|
@ -544,22 +544,22 @@ def get_headers_key_value(content: str) -> dict:
|
|||
return key_value
|
||||
|
||||
|
||||
def get_name_url(content, pattern, open_headers=False, check_url=True):
|
||||
def get_name_value(content, pattern, open_headers=False, check_value=True):
|
||||
"""
|
||||
Extract name and URL from content using a regex pattern.
|
||||
Extract name and value from content using a regex pattern.
|
||||
:param content: str, the input content to search.
|
||||
:param pattern: re.Pattern, the compiled regex pattern to match.
|
||||
:param open_headers: bool, whether to extract headers.
|
||||
:param check_url: bool, whether to validate the presence of a URL.
|
||||
:param check_value: bool, whether to validate the presence of a URL.
|
||||
"""
|
||||
result = []
|
||||
for match in pattern.finditer(content):
|
||||
group_dict = match.groupdict()
|
||||
name = (group_dict.get("name", "") or "").strip()
|
||||
url = (group_dict.get("url", "") or "").strip()
|
||||
if not name or (check_url and not url):
|
||||
value = (group_dict.get("value", "") or "").strip()
|
||||
if not name or (check_value and not value):
|
||||
continue
|
||||
data = {"name": name, "url": url}
|
||||
data = {"name": name, "value": value}
|
||||
attributes = {**get_headers_key_value(group_dict.get("attributes", "")),
|
||||
**get_headers_key_value(group_dict.get("options", ""))}
|
||||
headers = {
|
||||
|
|
@ -625,10 +625,10 @@ def get_name_urls_from_file(path: str, format_name_flag: bool = False) -> dict[s
|
|||
line = line.strip()
|
||||
if line.startswith("#"):
|
||||
continue
|
||||
name_url = get_name_url(line, pattern=constants.txt_pattern)
|
||||
if name_url and name_url[0]:
|
||||
name = format_name(name_url[0]["name"]) if format_name_flag else name_url[0]["name"]
|
||||
url = name_url[0]["url"]
|
||||
name_value = get_name_value(line, pattern=constants.txt_pattern)
|
||||
if name_value and name_value[0]:
|
||||
name = format_name(name_value[0]["name"]) if format_name_flag else name_value[0]["name"]
|
||||
url = name_value[0]["value"]
|
||||
if url not in name_urls[name]:
|
||||
name_urls[name].append(url)
|
||||
return name_urls
|
||||
|
|
|
|||
|
|
@ -37,3 +37,5 @@ class TestResult(TypedDict):
|
|||
TestResultCacheData = dict[str, list[TestResult]]
|
||||
|
||||
ChannelTestResult = Union[ChannelData, TestResult]
|
||||
|
||||
WhitelistMaps = tuple[dict[str, list[str]], dict[str, list[str]]]
|
||||
|
|
|
|||
|
|
@ -0,0 +1,169 @@
|
|||
import os
|
||||
import re
|
||||
from collections import defaultdict
|
||||
from typing import List, Pattern
|
||||
|
||||
import utils.constants as constants
|
||||
from utils.tools import get_real_path, resource_path
|
||||
from utils.types import WhitelistMaps
|
||||
|
||||
|
||||
def load_whitelist_maps(path: str = constants.whitelist_path) -> WhitelistMaps:
|
||||
"""
|
||||
Load whitelist maps from the given path.
|
||||
Returns two dictionaries:
|
||||
- exact: channel_name -> list of exact whitelist entries
|
||||
- keywords: channel_name -> list of keyword whitelist entries
|
||||
The special key "" (empty string) is used for global entries.
|
||||
"""
|
||||
|
||||
exact = defaultdict(list)
|
||||
keywords = defaultdict(list)
|
||||
in_keyword_section = False
|
||||
|
||||
real_path = get_real_path(resource_path(path))
|
||||
if not os.path.exists(real_path):
|
||||
return exact, keywords
|
||||
|
||||
with open(real_path, "r", encoding="utf-8") as f:
|
||||
for raw in f:
|
||||
line = raw.rstrip("\n")
|
||||
s = line.strip()
|
||||
if not s or s.startswith("#"):
|
||||
continue
|
||||
|
||||
if re.match(r"^\[.*\]$", s):
|
||||
in_keyword_section = s.upper() == "[KEYWORDS]"
|
||||
continue
|
||||
|
||||
if "," in s:
|
||||
name, value = map(str.strip, s.split(",", 1))
|
||||
key = name or ""
|
||||
else:
|
||||
key = ""
|
||||
value = s
|
||||
|
||||
if not value:
|
||||
continue
|
||||
|
||||
if in_keyword_section:
|
||||
if value not in keywords[key]:
|
||||
keywords[key].append(value)
|
||||
else:
|
||||
if value not in exact[key]:
|
||||
exact[key].append(value)
|
||||
|
||||
return exact, keywords
|
||||
|
||||
|
||||
def is_url_whitelisted(data_map: WhitelistMaps, url: str, channel_name: str | None = None) -> bool:
|
||||
"""
|
||||
Check if the given URL is whitelisted for the specified channel.
|
||||
If channel_name is None, only global whitelist entries are considered.
|
||||
1. Exact match (channel-specific)
|
||||
2. Exact match (global)
|
||||
3. Keyword match (channel-specific)
|
||||
4. Keyword match (global)
|
||||
5. If none match, return False
|
||||
"""
|
||||
if not url or not data_map:
|
||||
return False
|
||||
|
||||
exact_map, keyword_map = data_map
|
||||
channel_key = channel_name or ""
|
||||
|
||||
def check_exact_for(key):
|
||||
for candidate in exact_map.get(key, []):
|
||||
if not candidate:
|
||||
continue
|
||||
c = candidate.strip()
|
||||
if c == url:
|
||||
return True
|
||||
return False
|
||||
|
||||
if check_exact_for(channel_key) or check_exact_for(""):
|
||||
return True
|
||||
|
||||
for kw in keyword_map.get(channel_key, []) + keyword_map.get("", []):
|
||||
if not kw:
|
||||
continue
|
||||
if kw in url:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def get_whitelist_url(data_map: WhitelistMaps, channel_name: str | None = None) -> List[str]:
|
||||
"""
|
||||
Get the list of whitelisted URLs for the specified channel.
|
||||
If channel_name is None, only global whitelist entries are considered.
|
||||
"""
|
||||
exact_map, _ = data_map
|
||||
channel_key = channel_name or ""
|
||||
whitelist_urls = set()
|
||||
|
||||
for candidate in exact_map.get(channel_key, []) + exact_map.get("", []):
|
||||
c = candidate.strip()
|
||||
if c:
|
||||
whitelist_urls.add(c)
|
||||
|
||||
return list(whitelist_urls)
|
||||
|
||||
|
||||
def get_whitelist_total_count(data_map: WhitelistMaps) -> int:
|
||||
"""
|
||||
Get the total count of unique whitelist entries across all channels.
|
||||
"""
|
||||
exact_map, keyword_map = data_map
|
||||
unique_entries = set()
|
||||
|
||||
for entries in exact_map.values():
|
||||
for entry in entries:
|
||||
unique_entries.add(entry.strip())
|
||||
|
||||
for entries in keyword_map.values():
|
||||
for entry in entries:
|
||||
unique_entries.add(entry.strip())
|
||||
|
||||
return len(unique_entries)
|
||||
|
||||
|
||||
def get_section_entries(path: str = constants.whitelist_path, section: str = "WHITELIST",
|
||||
pattern: Pattern[str] = None) -> tuple[List[str], List[str]]:
|
||||
"""
|
||||
Get URLs from a specific section in the whitelist file.
|
||||
Returns a tuple: (inside_section_list, outside_section_list).
|
||||
"""
|
||||
real_path = get_real_path(resource_path(path))
|
||||
if not os.path.exists(real_path):
|
||||
return [], []
|
||||
|
||||
inside: List[str] = []
|
||||
outside: List[str] = []
|
||||
in_section = False
|
||||
header_re = re.compile(r"^\[.*\]$")
|
||||
|
||||
with open(real_path, "r", encoding="utf-8") as f:
|
||||
for raw in f:
|
||||
line = raw.rstrip("\n")
|
||||
s = line.strip()
|
||||
if not s:
|
||||
continue
|
||||
|
||||
if header_re.match(s):
|
||||
in_section = s.upper() == f"[{section.upper()}]"
|
||||
continue
|
||||
|
||||
if s.startswith("#"):
|
||||
continue
|
||||
|
||||
if s:
|
||||
target = inside if in_section else outside
|
||||
if pattern:
|
||||
match = pattern.search(s)
|
||||
if match:
|
||||
target.append(match.group())
|
||||
else:
|
||||
target.append(s)
|
||||
|
||||
return inside, outside
|
||||
Loading…
Reference in New Issue