-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathfind-iptv-channels-details.py
274 lines (238 loc) · 11.3 KB
/
find-iptv-channels-details.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
import os
import sys
import requests
import json
import argparse
import subprocess
import signal
import csv
from datetime import datetime
CACHE_FILE_PATTERN = "cache-{server}-{data_type}.json"
DEBUG_MODE = False # Default: Debugging is off
def debug_log(message):
"""
Logs a debug message if debugging is enabled.
"""
if DEBUG_MODE:
print(f"[DEBUG] {message}")
def load_cache(server, data_type):
"""Load data from the cache file if it exists and is up-to-date."""
debug_log(f"Attempting to load cache for {data_type} on server {server}")
cache_file = CACHE_FILE_PATTERN.format(server=server, data_type=data_type)
if os.path.exists(cache_file):
# Check if the cache file was created today
file_date = datetime.fromtimestamp(os.path.getmtime(cache_file)).date()
if file_date == datetime.today().date():
try:
with open(cache_file, 'r') as file:
return json.load(file)
except (OSError, IOError, json.JSONDecodeError) as e:
print(f"Error reading cache file {cache_file}: {e}", file=sys.stderr)
return None
def save_cache(server, data_type, data):
"""Save data to the cache file."""
debug_log(f"Attempting to save cache for {data_type} on server {server}")
cache_file = CACHE_FILE_PATTERN.format(server=server, data_type=data_type)
try:
with open(cache_file, 'w') as file:
json.dump(data, file)
except (OSError, IOError) as e:
print(f"Error saving cache file {cache_file}: {e}", file=sys.stderr)
def download_data(server, user, password, endpoint, additional_params=None):
"""Download data from the Xtream IPTV server."""
debug_log(f"Downloading data from {server}, endpoint: {endpoint}")
url = f"http://{server}/player_api.php"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}
params = {"username": user, "password": password, "action": endpoint}
if additional_params:
params.update(additional_params)
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
debug_log(f"Response from server ({endpoint}): {response.text[:500]}") # Print first 500 characters
return response.json()
else:
print(f"Failed to fetch {endpoint} data: {response.status_code}", file=sys.stderr)
sys.exit(1)
def check_epg(server, user, password, stream_id):
"""Check EPG data for a specific channel."""
debug_log(f"Checking EPG for stream ID {stream_id}")
epg_data = download_data(server, user, password, "get_simple_data_table", {"stream_id": stream_id})
if isinstance(epg_data, dict) and epg_data.get("epg_listings"):
return len(epg_data["epg_listings"]) # Return the count of EPG entries
elif isinstance(epg_data, list):
debug_log(f"Unexpected list response for EPG data: {epg_data}")
return len(epg_data) # Return the length of the list
else:
debug_log(f"Unexpected EPG response type: {type(epg_data)}")
return 0 # No EPG data available
def filter_data(live_categories, live_streams, group, channel):
"""Filter the live streams based on group and channel arguments."""
filtered_streams = []
group = group.lower() if group else None
channel = channel.lower() if channel else None
for stream in live_streams:
# Filter by group if specified
if group:
matching_categories = [cat for cat in live_categories if group in cat["category_name"].lower()]
if not any(cat["category_id"] == stream["category_id"] for cat in matching_categories):
continue
# Filter by channel if specified
if channel and channel not in stream["name"].lower():
continue
# Add the stream to the filtered list
filtered_streams.append(stream)
return filtered_streams
def check_ffprobe():
"""
Checks if the ffprobe command is available on the system.
Exits the program with an error message if not found.
"""
try:
subprocess.run(
["ffprobe", "-version"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)
debug_log("ffprobe is installed and reachable.")
except FileNotFoundError:
print("Error: ffprobe is not installed or not found in the system PATH. Please install ffprobe before running this program.")
sys.exit(1)
except subprocess.CalledProcessError as e:
print(f"Error: ffprobe check failed with error: {e}")
sys.exit(1)
def check_channel(url):
try:
result = subprocess.run(
[
"ffprobe",
"-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=codec_name,width,height,avg_frame_rate",
"-of", "json",
url
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
output = json.loads(result.stdout)
if 'streams' in output and len(output['streams']) > 0:
stream_info = output['streams'][0]
codec_name = stream_info.get('codec_name', 'Unknown')[:5]
width = stream_info.get('width', 'Unknown')
height = stream_info.get('height', 'Unknown')
avg_frame_rate = stream_info.get('avg_frame_rate', 'Unknown')
if avg_frame_rate != 'Unknown' and '/' in avg_frame_rate:
num, denom = map(int, avg_frame_rate.split('/'))
frame_rate = round(num / denom) if denom != 0 else "Unknown"
else:
frame_rate = avg_frame_rate
return {
"status": "working",
"codec_name": codec_name,
"width": width,
"height": height,
"frame_rate": frame_rate
}
else:
debug_log(f"No streams found in ffprobe output for URL: {url}")
return {"status": "not working"}
except Exception as e:
debug_log(f"Error in check_channel: {e}")
return {"status": "error", "error_message": str(e)}
def save_to_csv(file_name, data, fieldnames):
"""
Save data to a CSV file, ensuring all fields are enclosed in double quotes.
:param file_name: The name of the CSV file to save.
:param data: A list of dictionaries containing the data to write.
:param fieldnames: A list of field names for the CSV header.
"""
try:
with open(file_name, "w", newline="", encoding="utf-8") as file:
writer = csv.DictWriter(file, fieldnames=fieldnames, quoting=csv.QUOTE_ALL)
writer.writeheader()
writer.writerows(data)
print(f"Output saved to {file_name}")
except Exception as e:
print(f"Error saving to CSV: {e}")
def handle_sigint(signal, frame):
"""Handle Ctrl+C gracefully."""
print("\nProgram interrupted by user. Exiting...")
sys.exit(0)
def main():
global DEBUG_MODE
# Set up the signal handler for Ctrl+C
signal.signal(signal.SIGINT, handle_sigint)
parser = argparse.ArgumentParser(description="Xtream IPTV Downloader and Filter")
parser.add_argument("--server", required=True, help="The Xtream server to connect to.")
parser.add_argument("--user", required=True, help="The username to use.")
parser.add_argument("--pw", required=True, help="The password to use.")
parser.add_argument("--nocache", action="store_true", help="Force download and ignore cache.")
parser.add_argument("--channel", help="Filter by channel name.")
parser.add_argument("--category", help="Filter by category name.")
parser.add_argument("--debug", action="store_true", help="Enable debug mode.") # Debug flag
parser.add_argument("--epgcheck", action="store_true", help="Check if channels provide EPG data and count entries.")
parser.add_argument("--check", action="store_true", help="Check stream resolution and frame rate using ffprobe.")
parser.add_argument("--save", help="Save the output to a CSV file. Provide the file name.")
args = parser.parse_args()
# Print the date and time when the program is run
masked_server = f"{'.'.join(['xxxxx'] + args.server.split('.')[1:])}"
run_time = datetime.now().strftime("%Y-%m-%d %H:%M")
print(f"\n\nfind-iptv-channels-details - Running for server {masked_server} on {run_time}\n")
# Enable debug mode if the --debug flag is present
DEBUG_MODE = args.debug
debug_log("Debug mode enabled") # Will only print if debug mode is set. else ignored
# Check ffprobe if --check is enabled
if args.check:
check_ffprobe()
# Check cache or download data
live_categories = load_cache(args.server, "live_categories") if not args.nocache else None
live_streams = load_cache(args.server, "live_streams") if not args.nocache else None
if not live_categories or not live_streams:
live_categories = download_data(args.server, args.user, args.pw, "get_live_categories")
live_streams = download_data(args.server, args.user, args.pw, "get_live_streams")
save_cache(args.server, "live_categories", live_categories)
save_cache(args.server, "live_streams", live_streams)
# Filter data
filtered_streams = filter_data(live_categories, live_streams, args.category, args.channel)
# Prepare CSV data and headers
csv_data = []
fieldnames = ["Stream ID", "Name", "Category", "Archive", "EPG", "Codec", "Resolution", "Frame Rate"]
# Print and collect results
print(f"{'ID':<10}{'Name':<60} {'Category':<40}{'Archive':<8}{'EPG':<5}{'Codec':<8}{'Resolution':<15}{'Frame':<10}")
print("=" * 152)
category_map = {cat["category_id"]: cat["category_name"] for cat in live_categories}
for stream in filtered_streams:
category_name = category_map.get(stream["category_id"], "Unknown")
epg_count = (
check_epg(args.server, args.user, args.pw, stream["stream_id"]) if args.epgcheck else ""
)
# Generate stream URL
stream_url = f"http://{args.server}/{args.user}/{args.pw}/{stream['stream_id']}"
stream_info = (
check_channel(stream_url) if args.check else {"codec_name": "", "width": "", "height": "", "frame_rate": ""}
)
resolution = f"{stream_info.get('width', 'N/A')}x{stream_info.get('height', 'N/A')}" if args.check else "N/A" if args.check else ""
# Print to console
print(f"{stream['stream_id']:<10}{stream['name'][:60]:<60} {category_name[:40]:<40}{stream.get('tv_archive_duration', 'N/A'):<8}{epg_count:<5}{stream_info.get('codec_name', 'N/A'):<8}{resolution:<15}{stream_info.get('frame_rate', 'N/A'):<10}")
# Collect data for CSV
csv_data.append({
"Stream ID": stream["stream_id"],
"Name": stream['name'][:60],
"Category": category_name[:40],
"Archive": stream.get('tv_archive_duration', 'N/A'),
"EPG": epg_count,
"Codec": stream_info.get('codec_name', 'N/A'),
"Resolution": resolution,
"Frame Rate": stream_info.get('frame_rate', 'N/A'),
})
print(f"\n")
# Write to CSV if --save is provided
if args.save:
save_to_csv(args.save, csv_data, fieldnames)
print(f"\n\n")
if __name__ == "__main__":
main()