Skip to content

Commit 58aefb2

Browse files
michalslowikowski00michalslowikowski00
and
michalslowikowski00
authored
Added SDFtoGCSOperator (#8740)
Co-authored-by: michalslowikowski00 <michal.slowikowski@polidea.com>
1 parent b7566e1 commit 58aefb2

File tree

10 files changed

+718
-50
lines changed

10 files changed

+718
-50
lines changed

airflow/providers/google/common/hooks/base_google.py

+21-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
from google.auth import _cloud_sdk
4040
from google.auth.environment_vars import CREDENTIALS
4141
from googleapiclient.errors import HttpError
42-
from googleapiclient.http import set_user_agent
42+
from googleapiclient.http import MediaIoBaseDownload, set_user_agent
4343

4444
from airflow import version
4545
from airflow.exceptions import AirflowException
@@ -456,3 +456,23 @@ def provide_authorized_gcloud(self):
456456
creds_content["refresh_token"],
457457
])
458458
yield
459+
460+
@staticmethod
461+
def download_content_from_request(file_handle, request, chunk_size):
462+
"""
463+
Download media resources.
464+
Note that the Python file object is compatible with io.Base and can be used with this class also.
465+
466+
:param file_handle: io.Base or file object. The stream in which to write the downloaded
467+
bytes.
468+
:type file_handle: io.Base or file object
469+
:param request: googleapiclient.http.HttpRequest, the media request to perform in chunks.
470+
:type request: Dict
471+
:param chunk_size: int, File will be downloaded in chunks of this many bytes.
472+
:type chunk_size: int
473+
"""
474+
downloader = MediaIoBaseDownload(file_handle, request, chunksize=chunk_size)
475+
done = False
476+
while done is False:
477+
_, done = downloader.next_chunk()
478+
file_handle.flush()

airflow/providers/google/marketing_platform/example_dags/example_display_video.py

+64-10
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,34 @@
1919
Example Airflow DAG that shows how to use DisplayVideo.
2020
"""
2121
import os
22+
from typing import Dict
2223

2324
from airflow import models
25+
from airflow.providers.google.cloud.operators.gcs_to_bigquery import GCSToBigQueryOperator
2426
from airflow.providers.google.marketing_platform.operators.display_video import (
25-
GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator,
26-
GoogleDisplayVideo360DownloadLineItemsOperator, GoogleDisplayVideo360DownloadReportOperator,
27-
GoogleDisplayVideo360RunReportOperator, GoogleDisplayVideo360UploadLineItemsOperator,
27+
GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
28+
GoogleDisplayVideo360DeleteReportOperator, GoogleDisplayVideo360DownloadLineItemsOperator,
29+
GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator,
30+
GoogleDisplayVideo360SDFtoGCSOperator, GoogleDisplayVideo360UploadLineItemsOperator,
2831
)
2932
from airflow.providers.google.marketing_platform.sensors.display_video import (
30-
GoogleDisplayVideo360ReportSensor,
33+
GoogleDisplayVideo360GetSDFDownloadOperationSensor, GoogleDisplayVideo360ReportSensor,
3134
)
3235
from airflow.utils import dates
3336

3437
# [START howto_display_video_env_variables]
3538
BUCKET = os.environ.get("GMP_DISPLAY_VIDEO_BUCKET", "gs://test-display-video-bucket")
3639
ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID", 1234567)
3740
OBJECT_NAME = os.environ.get("GMP_OBJECT_NAME", "files/report.csv")
41+
PATH_TO_UPLOAD_FILE = os.environ.get(
42+
"GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt"
43+
)
44+
PATH_TO_SAVED_FILE = os.environ.get(
45+
"GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt"
46+
)
47+
BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
48+
SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_1")
49+
BQ_DATA_SET = os.environ.get("GMP_BQ_DATA_SET", "airflow_test")
3850

3951
REPORT = {
4052
"kind": "doubleclickbidmanager#query",
@@ -55,14 +67,16 @@
5567
}
5668

5769
PARAMS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"}
70+
71+
BODY_REQUEST: Dict = {
72+
"version": SDF_VERSION,
73+
"advertiserId": ADVERTISER_ID,
74+
"inventorySourceFilter": {"inventorySourceIds": []},
75+
}
5876
# [END howto_display_video_env_variables]
5977

6078
# download_line_items variables
61-
REQUEST_BODY = {
62-
"filterType": ADVERTISER_ID,
63-
"format": "CSV",
64-
"fileSpec": "EWF"
65-
}
79+
REQUEST_BODY = {"filterType": ADVERTISER_ID, "format": "CSV", "fileSpec": "EWF"}
6680

6781
default_args = {"start_date": dates.days_ago(1)}
6882

@@ -119,7 +133,47 @@
119133
upload_line_items = GoogleDisplayVideo360UploadLineItemsOperator(
120134
task_id="upload_line_items",
121135
bucket_name=BUCKET,
122-
object_name=OBJECT_NAME,
136+
object_name=BUCKET_FILE_LOCATION,
123137
)
124138
# [END howto_google_display_video_upload_line_items_operator]
139+
140+
# [START howto_google_display_video_create_sdf_download_task_operator]
141+
create_sdf_download_task = GoogleDisplayVideo360CreateSDFDownloadTaskOperator(
142+
task_id="create_sdf_download_task", body_request=BODY_REQUEST
143+
)
144+
operation_name = '{{ task_instance.xcom_pull("create_sdf_download_task")["name"] }}'
145+
# [END howto_google_display_video_create_sdf_download_task_operator]
146+
147+
# [START howto_google_display_video_wait_for_operation_sensor]
148+
wait_for_operation = GoogleDisplayVideo360GetSDFDownloadOperationSensor(
149+
task_id="wait_for_operation", operation_name=operation_name,
150+
)
151+
# [END howto_google_display_video_wait_for_operation_sensor]
152+
153+
# [START howto_google_display_video_save_sdf_in_gcs_operator]
154+
save_sdf_in_gcs = GoogleDisplayVideo360SDFtoGCSOperator(
155+
task_id="save_sdf_in_gcs",
156+
operation_name=operation_name,
157+
bucket_name=BUCKET,
158+
object_name=BUCKET_FILE_LOCATION,
159+
gzip=False,
160+
)
161+
# [END howto_google_display_video_save_sdf_in_gcs_operator]
162+
163+
# [START howto_google_display_video_gcs_to_big_query_operator]
164+
upload_sdf_to_big_query = GCSToBigQueryOperator(
165+
task_id="upload_sdf_to_big_query",
166+
bucket=BUCKET,
167+
source_objects=['{{ task_instance.xcom_pull("upload_sdf_to_bigquery")}}'],
168+
destination_project_dataset_table=f"{BQ_DATA_SET}.gcs_to_bq_table",
169+
schema_fields=[
170+
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
171+
{"name": "post_abbr", "type": "STRING", "mode": "NULLABLE"},
172+
],
173+
write_disposition="WRITE_TRUNCATE",
174+
dag=dag,
175+
)
176+
# [END howto_google_display_video_gcs_to_big_query_operator]
177+
125178
create_report >> run_report >> wait_for_report >> get_report >> delete_report
179+
create_sdf_download_task >> wait_for_operation >> save_sdf_in_gcs >> upload_sdf_to_big_query

airflow/providers/google/marketing_platform/hooks/display_video.py

+66-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,20 @@ def get_conn(self) -> Resource:
5656
)
5757
return self._conn
5858

59+
def get_conn_to_display_video(self) -> Resource:
60+
"""
61+
Retrieves connection to DisplayVideo.
62+
"""
63+
if not self._conn:
64+
http_authorized = self._authorize()
65+
self._conn = build(
66+
"displayvideo",
67+
self.api_version,
68+
http=http_authorized,
69+
cache_discovery=False,
70+
)
71+
return self._conn
72+
5973
def create_query(self, query: Dict[str, Any]) -> Dict:
6074
"""
6175
Creates a query.
@@ -111,7 +125,7 @@ def list_queries(self, ) -> List[Dict]:
111125
.listqueries()
112126
.execute(num_retries=self.num_retries)
113127
)
114-
return response.get('queries', [])
128+
return response.get("queries", [])
115129

116130
def run_query(self, query_id: str, params: Dict[str, Any]) -> None:
117131
"""
@@ -170,3 +184,54 @@ def download_line_items(self, request_body: Dict[str, Any]) -> List[Any]:
170184
.execute(num_retries=self.num_retries)
171185
)
172186
return response["lineItems"]
187+
188+
def create_sdf_download_operation(self, body_request: Dict[str, Any]) -> Dict[str, Any]:
189+
"""
190+
Creates an SDF Download Task and Returns an Operation.
191+
192+
:param body_request: Body request.
193+
:type body_request: Dict[str, Any]
194+
195+
More information about body request n be found here:
196+
https://developers.google.com/display-video/api/reference/rest/v1/sdfdownloadtasks/create
197+
"""
198+
199+
result = (
200+
self.get_conn_to_display_video() # pylint: disable=no-member
201+
.sdfdownloadtasks()
202+
.create(body=body_request)
203+
.execute(num_retries=self.num_retries)
204+
)
205+
return result
206+
207+
def get_sdf_download_operation(self, operation_name: str):
208+
"""
209+
Gets the latest state of an asynchronous SDF download task operation.
210+
211+
:param operation_name: The name of the operation resource.
212+
:type operation_name: str
213+
"""
214+
215+
result = (
216+
self.get_conn_to_display_video() # pylint: disable=no-member
217+
.sdfdownloadtasks()
218+
.operation()
219+
.get(name=operation_name)
220+
.execute(num_retries=self.num_retries)
221+
)
222+
return result
223+
224+
def download_media(self, resource_name: str):
225+
"""
226+
Downloads media.
227+
228+
:param resource_name: of the media that is being downloaded.
229+
:type resource_name: str
230+
"""
231+
232+
request = (
233+
self.get_conn_to_display_video() # pylint: disable=no-member
234+
.media()
235+
.download_media(resource_name=resource_name)
236+
)
237+
return request

0 commit comments

Comments
 (0)