Skip to content

Commit 2845a6e

Browse files
committed
logs
1 parent 2ff5eea commit 2845a6e

File tree

4 files changed

+244
-0
lines changed

4 files changed

+244
-0
lines changed

.DS_Store

6 KB
Binary file not shown.

Logs_Anomalies/anomaly-detector.py

+145
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
# anomaly_detector.py
2+
from typing import Dict, List, Optional
3+
import numpy as np
4+
from datetime import datetime
5+
import tensorflow as tf
6+
from google.cloud import storage
7+
import json
8+
9+
class AnomalyDetector:
10+
def __init__(self, logger, bucket_name: str):
11+
self.logger = logger
12+
self.bucket_name = bucket_name
13+
self.client = storage.Client()
14+
self.bucket = self.client.bucket(bucket_name)
15+
16+
# Define thresholds
17+
self.thresholds = {
18+
'missing_labels_ratio': 0.05,
19+
'corrupt_images_ratio': 0.02,
20+
'min_image_size': 100 * 100, # minimum 100x100 pixels
21+
'max_image_size': 1000 * 1000, # maximum 1000x1000 pixels
22+
'class_imbalance_ratio': 0.1 # minimum 10% for any class
23+
}
24+
25+
def check_data_completeness(self) -> Dict:
26+
"""Check for missing data and labels"""
27+
try:
28+
self.logger.log_task_start("check_data_completeness")
29+
30+
# Get all image files
31+
image_files = set(
32+
blob.name.split('/')[-1]
33+
for blob in self.bucket.list_blobs(prefix='raw/xray/')
34+
if blob.name.endswith(('.png', '.jpg', '.jpeg'))
35+
)
36+
37+
# Get all labeled files
38+
labels_blob = self.bucket.blob('raw/xray/labels.csv')
39+
labels_df = pd.read_csv(labels_blob.download_as_string())
40+
labeled_files = set(labels_df['image_id'].values)
41+
42+
# Calculate metrics
43+
metrics = {
44+
'total_images': len(image_files),
45+
'total_labeled': len(labeled_files),
46+
'missing_labels': len(image_files - labeled_files),
47+
'extra_labels': len(labeled_files - image_files),
48+
'timestamp': datetime.now().isoformat()
49+
}
50+
51+
# Check for anomalies
52+
missing_ratio = metrics['missing_labels'] / metrics['total_images']
53+
if missing_ratio > self.thresholds['missing_labels_ratio']:
54+
self.logger.log_error(
55+
"data_completeness",
56+
f"High ratio of missing labels: {missing_ratio:.2%}",
57+
alert=True
58+
)
59+
60+
return metrics
61+
62+
except Exception as e:
63+
self.logger.log_error("check_data_completeness", e)
64+
raise
65+
66+
def check_image_quality(self, sample_size: int = 100) -> Dict:
67+
"""Check image quality and format"""
68+
try:
69+
self.logger.log_task_start("check_image_quality")
70+
71+
# Sample images
72+
blobs = list(self.bucket.list_blobs(prefix='raw/xray/'))[:sample_size]
73+
74+
metrics = {
75+
'corrupt_images': 0,
76+
'invalid_dimensions': 0,
77+
'invalid_format': 0,
78+
'samples_checked': len(blobs)
79+
}
80+
81+
for blob in blobs:
82+
try:
83+
# Try to decode image
84+
image_data = blob.download_as_bytes()
85+
image = tf.image.decode_image(image_data)
86+
87+
# Check dimensions
88+
image_size = image.shape[0] * image.shape[1]
89+
if (image_size < self.thresholds['min_image_size'] or
90+
image_size > self.thresholds['max_image_size']):
91+
metrics['invalid_dimensions'] += 1
92+
93+
except Exception:
94+
metrics['corrupt_images'] += 1
95+
96+
# Calculate ratios
97+
metrics['corrupt_ratio'] = metrics['corrupt_images'] / metrics['samples_checked']
98+
metrics['invalid_dim_ratio'] = metrics['invalid_dimensions'] / metrics['samples_checked']
99+
100+
# Check for anomalies
101+
if metrics['corrupt_ratio'] > self.thresholds['corrupt_images_ratio']:
102+
self.logger.log_error(
103+
"image_quality",
104+
f"High ratio of corrupt images: {metrics['corrupt_ratio']:.2%}",
105+
alert=True
106+
)
107+
108+
return metrics
109+
110+
except Exception as e:
111+
self.logger.log_error("check_image_quality", e)
112+
raise
113+
114+
def check_class_distribution(self) -> Dict:
115+
"""Check for class imbalance"""
116+
try:
117+
self.logger.log_task_start("check_class_distribution")
118+
119+
# Load labels
120+
labels_blob = self.bucket.blob('raw/xray/labels.csv')
121+
labels_df = pd.read_csv(labels_blob.download_as_string())
122+
123+
# Calculate class distribution
124+
class_dist = labels_df['label'].value_counts()
125+
total_samples = len(labels_df)
126+
127+
metrics = {
128+
'class_distribution': class_dist.to_dict(),
129+
'class_ratios': (class_dist / total_samples).to_dict()
130+
}
131+
132+
# Check for class imbalance
133+
for class_name, ratio in metrics['class_ratios'].items():
134+
if ratio < self.thresholds['class_imbalance_ratio']:
135+
self.logger.log_error(
136+
"class_distribution",
137+
f"Class {class_name} is underrepresented: {ratio:.2%}",
138+
alert=True
139+
)
140+
141+
return metrics
142+
143+
except Exception as e:
144+
self.logger.log_error("check_class_distribution", e)
145+
raise

Logs_Anomalies/logging-setup.py

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# logging_config.py
2+
import logging
3+
from logging.handlers import RotatingFileHandler
4+
import os
5+
from datetime import datetime
6+
from typing import Dict, Any
7+
from airflow.hooks.base import BaseHook
8+
from slack_sdk import WebClient
9+
from slack_sdk.errors import SlackApiError
10+
11+
class PipelineLogger:
12+
def __init__(self, pipeline_name: str):
13+
self.pipeline_name = pipeline_name
14+
self.log_dir = f"logs/{pipeline_name}"
15+
os.makedirs(self.log_dir, exist_ok=True)
16+
17+
# Set up file handler
18+
log_file = f"{self.log_dir}/{datetime.now().strftime('%Y%m%d')}.log"
19+
file_handler = RotatingFileHandler(
20+
log_file,
21+
maxBytes=10485760, # 10MB
22+
backupCount=5
23+
)
24+
25+
# Set up console handler
26+
console_handler = logging.StreamHandler()
27+
28+
# Create formatters and add it to handlers
29+
log_format = logging.Formatter(
30+
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
31+
)
32+
file_handler.setFormatter(log_format)
33+
console_handler.setFormatter(log_format)
34+
35+
# Create logger
36+
self.logger = logging.getLogger(pipeline_name)
37+
self.logger.setLevel(logging.INFO)
38+
self.logger.addHandler(file_handler)
39+
self.logger.addHandler(console_handler)
40+
41+
# Slack client setup
42+
self.slack_client = self._setup_slack()
43+
44+
def _setup_slack(self) -> WebClient:
45+
"""Set up Slack client using Airflow connection"""
46+
try:
47+
slack_conn = BaseHook.get_connection('slack_webhook')
48+
return WebClient(token=slack_conn.password)
49+
except Exception as e:
50+
self.logger.error(f"Failed to setup Slack client: {str(e)}")
51+
return None
52+
53+
def log_task_start(self, task_name: str, params: Dict[str, Any] = None):
54+
"""Log task start with parameters"""
55+
msg = f"Starting task: {task_name}"
56+
if params:
57+
msg += f" with parameters: {params}"
58+
self.logger.info(msg)
59+
60+
def log_task_completion(self, task_name: str, metrics: Dict[str, Any] = None):
61+
"""Log task completion with metrics"""
62+
msg = f"Completed task: {task_name}"
63+
if metrics:
64+
msg += f" with metrics: {metrics}"
65+
self.logger.info(msg)
66+
67+
def log_error(self, task_name: str, error: Exception, alert: bool = True):
68+
"""Log error and optionally send alert"""
69+
error_msg = f"Error in task {task_name}: {str(error)}"
70+
self.logger.error(error_msg)
71+
72+
if alert and self.slack_client:
73+
try:
74+
self.slack_client.chat_postMessage(
75+
channel="#pipeline-alerts",
76+
text=f":red_circle: *Pipeline Error*\n{error_msg}"
77+
)
78+
except SlackApiError as e:
79+
self.logger.error(f"Failed to send Slack alert: {str(e)}")
80+
81+
def log_metric(self, metric_name: str, value: Any):
82+
"""Log a specific metric"""
83+
self.logger.info(f"Metric - {metric_name}: {value}")

Logs_Anomalies/pipeline-optimizer.py

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# pipeline_optimizer.py
2+
from airflow.models import DagRun
3+
from airflow.utils.db import provide_session
4+
from datetime import datetime, timedelta
5+
import numpy as np
6+
from typing import Dict, List
7+
import json
8+
9+
class PipelineOptimizer:
10+
def __init__(self, logger, dag_id: str):
11+
self.logger = logger
12+
self.dag_id = dag_id
13+
14+
@provide_session
15+
def analyze_task_durations(self, session=None, lookback_days: int = 7) -> Dict:
16+
"""Analyze task durations from Airflow's history"""

0 commit comments

Comments
 (0)