18
18
19
19
Options:
20
20
--offline-partial: Generate a partial report using cached data without making API calls
21
+ --recommend-next-n: Recommend the next n top-performing videos, assuming the CSV report is already created.
21
22
22
23
Note: This script fetches all publicly available metrics from the YouTube Data API for all videos in the specified playlist.
23
24
Watch time is not available through this API, and dislike counts are no longer public.
40
41
PLAYLIST_ID = os .getenv ("YOUTUBE_PLAYLIST_ID" )
41
42
PROGRESS_FILE = "progress.json"
42
43
43
- youtube = build ('youtube' , 'v3' , developerKey = API_KEY )
44
+ youtube = build ("youtube" , "v3" , developerKey = API_KEY )
45
+
44
46
45
47
def get_playlist_item_count (playlist_id ):
46
48
try :
47
- request = youtube .playlists ().list (
48
- part = "contentDetails" ,
49
- id = playlist_id
50
- )
49
+ request = youtube .playlists ().list (part = "contentDetails" , id = playlist_id )
51
50
response = request .execute ()
52
- return int (response [' items' ][0 ][' contentDetails' ][ ' itemCount' ])
51
+ return int (response [" items" ][0 ][" contentDetails" ][ " itemCount" ])
53
52
except HttpError as e :
54
53
print (f"An error occurred while fetching playlist info: { e } " )
55
54
return None
56
55
56
+
57
57
def get_video_data (video_item ):
58
- video_id = video_item [' snippet' ][ ' resourceId' ][ ' videoId' ]
59
- title = video_item [' snippet' ][ ' title' ]
60
-
58
+ video_id = video_item [" snippet" ][ " resourceId" ][ " videoId" ]
59
+ title = video_item [" snippet" ][ " title" ]
60
+
61
61
try :
62
- video_response = youtube .videos ().list (
63
- part = 'statistics,contentDetails' ,
64
- id = video_id
65
- ).execute ()
66
-
67
- if video_response ['items' ]:
68
- stats = video_response ['items' ][0 ]['statistics' ]
69
- content_details = video_response ['items' ][0 ]['contentDetails' ]
70
-
71
- duration = isodate .parse_duration (content_details .get ('duration' , 'PT0S' )).total_seconds ()
72
-
62
+ video_response = (
63
+ youtube .videos ()
64
+ .list (part = "statistics,contentDetails" , id = video_id )
65
+ .execute ()
66
+ )
67
+
68
+ if video_response ["items" ]:
69
+ stats = video_response ["items" ][0 ]["statistics" ]
70
+ content_details = video_response ["items" ][0 ]["contentDetails" ]
71
+
72
+ duration = isodate .parse_duration (
73
+ content_details .get ("duration" , "PT0S" )
74
+ ).total_seconds ()
75
+
73
76
return {
74
77
"title" : title ,
75
78
"url" : f"https://youtu.be/{ video_id } " ,
76
- "views" : int (stats .get (' viewCount' , 0 )),
77
- "likes" : int (stats .get (' likeCount' , 0 )),
78
- "comments" : int (stats .get (' commentCount' , 0 )),
79
+ "views" : int (stats .get (" viewCount" , 0 )),
80
+ "likes" : int (stats .get (" likeCount" , 0 )),
81
+ "comments" : int (stats .get (" commentCount" , 0 )),
79
82
"duration" : duration ,
80
- "title_length" : len (title )
83
+ "title_length" : len (title ),
81
84
}
82
85
except HttpError as e :
83
86
print (f"An error occurred: { e } " )
84
-
87
+
85
88
return None
86
89
90
+
87
91
def get_all_playlist_items (playlist_id ):
88
92
try :
89
93
total_videos = get_playlist_item_count (playlist_id )
@@ -100,58 +104,61 @@ def get_all_playlist_items(playlist_id):
100
104
part = "snippet" ,
101
105
playlistId = playlist_id ,
102
106
maxResults = 50 ,
103
- pageToken = next_page_token
107
+ pageToken = next_page_token ,
104
108
)
105
109
response = request .execute ()
106
110
107
- for item in response [' items' ]:
111
+ for item in response [" items" ]:
108
112
video = get_video_data (item )
109
113
if video :
110
114
videos .append (video )
111
-
115
+
112
116
if total_videos :
113
- print (f"Fetched { len (videos )} /{ total_videos } videos ({ (len (videos )/ total_videos )* 100 :.2f} %)" )
117
+ print (
118
+ f"Fetched { len (videos )} /{ total_videos } videos ({ (len (videos )/ total_videos )* 100 :.2f} %)"
119
+ )
114
120
else :
115
121
print (f"Fetched { len (videos )} videos so far..." )
116
122
117
- next_page_token = response .get (' nextPageToken' )
123
+ next_page_token = response .get (" nextPageToken" )
118
124
if not next_page_token :
119
125
break
120
126
121
127
print (f"Successfully fetched data for { len (videos )} videos" )
122
128
123
129
# Sort videos by view count in descending order
124
- return sorted (videos , key = lambda x : x [' views' ], reverse = True )
130
+ return sorted (videos , key = lambda x : x [" views" ], reverse = True )
125
131
126
132
except HttpError as e :
127
133
print (f"An error occurred: { e } " )
128
134
return []
129
135
136
+
130
137
def calculate_percentile (data , percentile ):
131
- return statistics .quantiles (data , n = 4 )[percentile - 1 ]
138
+ return statistics .quantiles (data , n = 4 )[percentile - 1 ]
139
+
132
140
133
141
def generate_report (video_data ):
134
- metrics = [' views' , ' likes' , ' comments' , ' duration' , ' title_length' ]
142
+ metrics = [" views" , " likes" , " comments" , " duration" , " title_length" ]
135
143
report = {metric : {} for metric in metrics }
136
144
137
145
for metric in metrics :
138
146
values = [v [metric ] for v in video_data if v and v [metric ] is not None ]
139
-
147
+
140
148
if values :
141
149
report [metric ] = {
142
- ' max' : max (values ),
143
- ' p75' : calculate_percentile (values , 3 ),
144
- ' p50' : calculate_percentile (values , 2 ),
145
- ' average' : sum (values ) / len (values ),
146
- ' p25' : calculate_percentile (values , 1 )
150
+ " max" : max (values ),
151
+ " p75" : calculate_percentile (values , 3 ),
152
+ " p50" : calculate_percentile (values , 2 ),
153
+ " average" : sum (values ) / len (values ),
154
+ " p25" : calculate_percentile (values , 1 ),
147
155
}
148
156
else :
149
- report [metric ] = {
150
- 'max' : 0 , 'p75' : 0 , 'p50' : 0 , 'average' : 0 , 'p25' : 0
151
- }
157
+ report [metric ] = {"max" : 0 , "p75" : 0 , "p50" : 0 , "average" : 0 , "p25" : 0 }
152
158
153
159
return report
154
160
161
+
155
162
def categorize_videos (video_data , report ):
156
163
high_value = []
157
164
low_value = []
@@ -160,45 +167,55 @@ def categorize_videos(video_data, report):
160
167
if video is None :
161
168
continue
162
169
high_count = low_count = 0
163
- for metric in [' views' , ' likes' , ' comments' ]:
170
+ for metric in [" views" , " likes" , " comments" ]:
164
171
value = video [metric ]
165
- if value >= report [metric ][' p75' ]:
172
+ if value >= report [metric ][" p75" ]:
166
173
high_count += 1
167
- elif value <= report [metric ][' p25' ]:
174
+ elif value <= report [metric ][" p25" ]:
168
175
low_count += 1
169
-
176
+
170
177
if high_count >= 2 :
171
- high_value .append (video [' url' ])
178
+ high_value .append (video [" url" ])
172
179
if low_count >= 2 :
173
- low_value .append (video [' url' ])
180
+ low_value .append (video [" url" ])
174
181
175
182
return high_value , low_value
176
183
184
+
177
185
def save_progress (video_data ):
178
- with open (PROGRESS_FILE , 'w' ) as f :
179
- json .dump ({
180
- ' video_data' : video_data ,
181
- 'timestamp' : datetime . now (). isoformat ( )
182
- }, f )
186
+ with open (PROGRESS_FILE , "w" ) as f :
187
+ json .dump (
188
+ { " video_data" : video_data , "timestamp" : datetime . now (). isoformat ()}, f
189
+ )
190
+
183
191
184
192
def load_progress ():
185
193
if os .path .exists (PROGRESS_FILE ):
186
- with open (PROGRESS_FILE , 'r' ) as f :
194
+ with open (PROGRESS_FILE , "r" ) as f :
187
195
data = json .load (f )
188
-
189
- timestamp = datetime .fromisoformat (data [' timestamp' ])
196
+
197
+ timestamp = datetime .fromisoformat (data [" timestamp" ])
190
198
if datetime .now () - timestamp > timedelta (hours = 24 ):
191
199
print ("Cache is more than 24 hours old. Do you want to start fresh? (y/n)" )
192
- if input ().lower () == 'y' :
200
+ if input ().lower () == "y" :
193
201
return []
194
-
195
- return data [' video_data' ]
202
+
203
+ return data [" video_data" ]
196
204
return []
197
205
206
+
198
207
def generate_full_report (video_data ):
199
208
# Write CSV report
200
- with open ('report_video_data.csv' , 'w' , newline = '' , encoding = 'utf-8' ) as csvfile :
201
- fieldnames = ['title' , 'url' , 'views' , 'likes' , 'comments' , 'duration' , 'title_length' ]
209
+ with open ("report_video_data.csv" , "w" , newline = "" , encoding = "utf-8" ) as csvfile :
210
+ fieldnames = [
211
+ "title" ,
212
+ "url" ,
213
+ "views" ,
214
+ "likes" ,
215
+ "comments" ,
216
+ "duration" ,
217
+ "title_length" ,
218
+ ]
202
219
writer = csv .DictWriter (csvfile , fieldnames = fieldnames )
203
220
writer .writeheader ()
204
221
for video in video_data :
@@ -216,37 +233,129 @@ def generate_full_report(video_data):
216
233
# Categorize videos and write to JSON files
217
234
high_value , low_value = categorize_videos (video_data , report )
218
235
219
- with open (' urls_high_value_automated.json' , 'w' ) as f :
236
+ with open (" urls_high_value_automated.json" , "w" ) as f :
220
237
json .dump (high_value , f , indent = 2 )
221
238
222
- with open (' urls_low_value_automated.json' , 'w' ) as f :
239
+ with open (" urls_low_value_automated.json" , "w" ) as f :
223
240
json .dump (low_value , f , indent = 2 )
224
241
225
- print ("\n Report generated successfully. Check 'report_video_data.csv' for detailed data." )
242
+ print (
243
+ "\n Report generated successfully. Check 'report_video_data.csv' for detailed data."
244
+ )
226
245
print ("High-value and low-value video URLs have been saved to JSON files." )
227
246
247
+
248
+ def load_video_data_from_csv (csv_file = "report_video_data.csv" ):
249
+ video_data = []
250
+ try :
251
+ with open (csv_file , "r" , newline = "" , encoding = "utf-8" ) as csvfile :
252
+ reader = csv .DictReader (csvfile )
253
+ for row in reader :
254
+ # Convert numeric fields back to integers
255
+ for field in ["views" , "likes" , "comments" , "duration" , "title_length" ]:
256
+ row [field ] = int (float (row [field ]))
257
+ video_data .append (row )
258
+ return sorted (video_data , key = lambda x : x ["views" ], reverse = True )
259
+ except FileNotFoundError :
260
+ print (
261
+ f"Error: CSV file '{ csv_file } ' not found. Please run the report generation first."
262
+ )
263
+ return None
264
+
265
+
266
+ def load_filter_urls ():
267
+ filter_urls = set ()
268
+ filter_files = [
269
+ "./urls_low_value_manual.ignoreme.json" ,
270
+ "./urls_low_value_manual.json" ,
271
+ "./urls_low_value_automated.json" ,
272
+ ]
273
+
274
+ for file in filter_files :
275
+ try :
276
+ with open (file , "r" ) as f :
277
+ urls = json .load (f )
278
+ if isinstance (urls , list ):
279
+ filter_urls .update (urls )
280
+ else :
281
+ print (
282
+ f"Warning: Filter file { file } does not contain a list of URLs. Skipping."
283
+ )
284
+ except FileNotFoundError :
285
+ print (f"Warning: Filter file { file } not found. Skipping." )
286
+ except json .JSONDecodeError :
287
+ print (f"Warning: Filter file { file } is not valid JSON. Skipping." )
288
+
289
+ return filter_urls
290
+
291
+
292
+ def recommend_next_videos (n ):
293
+ video_data = load_video_data_from_csv ()
294
+ if not video_data :
295
+ return
296
+
297
+ filter_urls = load_filter_urls ()
298
+ recommended = []
299
+
300
+ for video in video_data :
301
+ if video ["url" ] not in filter_urls :
302
+ recommended .append (video ["url" ])
303
+ if len (recommended ) == n :
304
+ break
305
+
306
+ if len (recommended ) < n :
307
+ print (f"Warning: Only { len (recommended )} videos available after filtering." )
308
+
309
+ print (
310
+ f"Here are the top { len (recommended )} video URLs recommended for you to post next:"
311
+ )
312
+ for url in recommended :
313
+ print (url )
314
+
315
+
228
316
def main ():
229
- parser = argparse .ArgumentParser (description = "YouTube Playlist Performance Report Generator" )
230
- parser .add_argument ("--offline-partial" , action = "store_true" , help = "Generate a partial report using cached data without making API calls" )
317
+ parser = argparse .ArgumentParser (
318
+ description = "YouTube Playlist Performance Report Generator"
319
+ )
320
+ parser .add_argument (
321
+ "--offline-partial" ,
322
+ action = "store_true" ,
323
+ help = "Generate a partial report using cached data without making API calls" ,
324
+ )
325
+ parser .add_argument (
326
+ "--recommend-next-n" ,
327
+ type = int ,
328
+ help = "Recommend the next n top-performing videos" ,
329
+ )
231
330
args = parser .parse_args ()
232
331
233
- if args .offline_partial :
234
- video_data = load_progress ()
332
+ if args .recommend_next_n is not None :
333
+ recommend_next_videos (args .recommend_next_n )
334
+ else :
335
+ if args .offline_partial :
336
+ video_data = load_progress ()
337
+ if not video_data :
338
+ print (
339
+ "No cached data available. Please run the script in online mode first."
340
+ )
341
+ return
342
+ print (
343
+ f"Generating offline partial report based on { len (video_data )} cached videos."
344
+ )
345
+ else :
346
+ print (f"Fetching all playlist data and sorting by view count..." )
347
+ video_data = get_all_playlist_items (PLAYLIST_ID )
348
+ save_progress (video_data )
349
+
235
350
if not video_data :
236
- print ("No cached data available. Please run the script in online mode first." )
351
+ print (
352
+ "No valid video data could be retrieved. Please check your API key and playlist ID."
353
+ )
237
354
return
238
- print (f"Generating offline partial report based on { len (video_data )} cached videos." )
239
- else :
240
- print (f"Fetching all playlist data and sorting by view count..." )
241
- video_data = get_all_playlist_items (PLAYLIST_ID )
242
- save_progress (video_data )
243
355
244
- if not video_data :
245
- print ("No valid video data could be retrieved. Please check your API key and playlist ID." )
246
- return
356
+ print (f"\n Generating report for { len (video_data )} videos..." )
357
+ generate_full_report (video_data )
247
358
248
- print (f"\n Generating report for { len (video_data )} videos..." )
249
- generate_full_report (video_data )
250
359
251
360
if __name__ == "__main__" :
252
361
main ()
0 commit comments