7
7
import pathlib
8
8
import re
9
9
import shutil
10
- import subprocess
10
+ import subprocess # nosec: B404
11
11
import sys
12
12
import tempfile
13
13
import traceback
14
14
import requests
15
15
import zipfile
16
16
from typing import Generator , List , Optional , Tuple
17
+ from urllib .parse import urlparse
17
18
18
19
import click
19
20
30
31
_logger = logging .getLogger (__name__ )
31
32
32
33
METADATA_FILE = "_installer_metadata.json"
34
+ _NETWORK_TIMEOUT_IN_SECONDS = 60
33
35
34
36
35
37
def _parse_version (version : str ) -> Tuple [int , ...]:
@@ -96,7 +98,8 @@ def _get_daqmx_installed_version() -> Optional[str]:
96
98
_logger .debug ("Checking for installed NI-DAQmx version" )
97
99
commands_info = LINUX_COMMANDS [distribution ]
98
100
query_command = commands_info .get_daqmx_version
99
- query_output = subprocess .run (query_command , stdout = subprocess .PIPE , text = True ).stdout
101
+ # Run the package query command defined by _linux_installation_commands.py.
102
+ query_output = subprocess .run (query_command , stdout = subprocess .PIPE , text = True ).stdout # nosec: B603
100
103
101
104
if distribution == "ubuntu" :
102
105
version_match = re .search (r"ii\s+ni-daqmx\s+(\d+\.\d+\.\d+)" , query_output )
@@ -233,16 +236,17 @@ def _install_daqmx_driver_windows_core(download_url: str) -> None:
233
236
Download and launch NI-DAQmx Driver installation in an interactive mode
234
237
235
238
"""
236
- temp_file = None
239
+ _validate_download_url ( download_url )
237
240
try :
238
241
with _multi_access_temp_file () as temp_file :
239
242
_logger .info ("Downloading Driver to %s" , temp_file )
240
- response = requests .get (download_url )
243
+ response = requests .get (download_url , timeout = _NETWORK_TIMEOUT_IN_SECONDS )
241
244
response .raise_for_status ()
242
245
with open (temp_file , 'wb' ) as f :
243
246
f .write (response .content )
244
247
_logger .info ("Installing NI-DAQmx" )
245
- subprocess .run ([temp_file ], shell = True , check = True )
248
+ # Run the installer that we just downloaded from https://download.ni.com.
249
+ subprocess .run ([temp_file ], shell = True , check = True ) # nosec: B602
246
250
except subprocess .CalledProcessError as e :
247
251
_logger .info ("Failed to install NI-DAQmx driver." , exc_info = True )
248
252
raise click .ClickException (
@@ -262,10 +266,11 @@ def _install_daqmx_driver_linux_core(download_url: str, release: str) -> None:
262
266
263
267
"""
264
268
if sys .platform .startswith ("linux" ):
269
+ _validate_download_url (download_url )
265
270
try :
266
271
with _multi_access_temp_file (suffix = ".zip" ) as temp_file :
267
272
_logger .info ("Downloading Driver to %s" , temp_file )
268
- response = requests .get (download_url )
273
+ response = requests .get (download_url , timeout = _NETWORK_TIMEOUT_IN_SECONDS )
269
274
response .raise_for_status ()
270
275
with open (temp_file , 'wb' ) as f :
271
276
f .write (response .content )
@@ -283,7 +288,10 @@ def _install_daqmx_driver_linux_core(download_url: str, release: str) -> None:
283
288
directory_to_extract_to , distro .id (), distro .version (), release
284
289
):
285
290
print ("\n Running command:" , " " .join (command ))
286
- subprocess .run (command , check = True )
291
+ # Run the commands defined in
292
+ # _linux_installation_commands.py to install the package
293
+ # that we just downloaded from https://download.ni.com.
294
+ subprocess .run (command , check = True ) # nosec: B603
287
295
288
296
# Check if the installation was successful
289
297
if not _get_daqmx_installed_version ():
@@ -312,6 +320,13 @@ def _install_daqmx_driver_linux_core(download_url: str, release: str) -> None:
312
320
raise NotImplementedError ("This function is only supported on Linux." )
313
321
314
322
323
+ def _validate_download_url (download_url : str ) -> None :
324
+ """Velidate that the download URL uses https and points to a trusted site."""
325
+ parsed_url = urlparse (download_url )
326
+ if parsed_url .scheme != "https" or parsed_url .netloc != "download.ni.com" :
327
+ raise click .ClickException (f"Unsupported download URL: { download_url } " )
328
+
329
+
315
330
def _ask_user_confirmation (user_message : str ) -> bool :
316
331
"""
317
332
Prompt for user confirmation
0 commit comments