diff --git a/pypdf/_reader.py b/pypdf/_reader.py index 809ccda83..2d3c73d8e 100644 --- a/pypdf/_reader.py +++ b/pypdf/_reader.py @@ -29,8 +29,10 @@ import os import re -from io import BytesIO, UnsupportedOperation +import weakref +from io import BytesIO, FileIO, UnsupportedOperation from pathlib import Path +from types import TracebackType from typing import ( Any, Callable, @@ -39,6 +41,7 @@ List, Optional, Tuple, + Type, Union, cast, ) @@ -99,6 +102,9 @@ class PdfReader(PdfDocCommon): password: Decrypt PDF file at initialization. If the password is None, the file will not be decrypted. Defaults to ``None``. + + Can also be instantiated as a contextmanager which will automatically close + the underlying file pointer if passed via filenames. """ def __init__( @@ -121,9 +127,13 @@ def __init__( "It may not be read correctly.", __name__, ) + + self._opened_automatically = False if isinstance(stream, (str, Path)): - with open(stream, "rb") as fh: - stream = BytesIO(fh.read()) + stream = FileIO(stream, "rb") + self._opened_automatically = True + weakref.finalize(self, stream.close) + self.read(stream) self.stream = stream @@ -153,6 +163,24 @@ def __init__( elif password is not None: raise PdfReadError("Not encrypted file") + def close(self) -> None: + """Close the underlying file handle""" + self.stream.close() + + def __enter__(self) -> "PdfReader": + """Use PdfReader as context manager""" + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + """Close the underlying stream if owned by the PdfReader""" + if self._opened_automatically: + self.close() + @property def root_object(self) -> DictionaryObject: """Provide access to "/Root". standardized with PdfWriter.""" diff --git a/pypdf/generic/_base.py b/pypdf/generic/_base.py index bf6c75a15..d17257cf1 100644 --- a/pypdf/generic/_base.py +++ b/pypdf/generic/_base.py @@ -283,9 +283,6 @@ def indirect_reference(self) -> "IndirectObject": # type: ignore[override] def get_object(self) -> Optional["PdfObject"]: return self.pdf.get_object(self) - def __deepcopy__(self, memo: Any) -> "IndirectObject": - return IndirectObject(self.idnum, self.generation, self.pdf) - def _get_object_with_check(self) -> Optional["PdfObject"]: o = self.get_object() # the check is done here to not slow down get_object() diff --git a/tests/test_page.py b/tests/test_page.py index 2a3cc4fe6..355975c8b 100644 --- a/tests/test_page.py +++ b/tests/test_page.py @@ -1,7 +1,6 @@ """Test the pypdf._page module.""" import json import math -from copy import deepcopy from io import BytesIO from pathlib import Path from random import shuffle @@ -154,25 +153,25 @@ def test_mediabox_expansion_after_rotation( def test_transformation_equivalence(): - pdf_path = RESOURCE_ROOT / "labeled-edges-center-image.pdf" - reader_base = PdfReader(pdf_path) - page_base = reader_base.pages[0] + def get_pages() -> Tuple[PageObject, PageObject]: + pdf_path = RESOURCE_ROOT / "labeled-edges-center-image.pdf" + reader_base = PdfReader(pdf_path) + page_base = reader_base.pages[0] - pdf_path = RESOURCE_ROOT / "box.pdf" - reader_add = PdfReader(pdf_path) - page_box = reader_add.pages[0] + pdf_path = RESOURCE_ROOT / "box.pdf" + reader_add = PdfReader(pdf_path) + page_box = reader_add.pages[0] + return page_box, page_base op = Transformation().scale(2).rotate(45) # Option 1: The new way - page_box1 = deepcopy(page_box) - page_base1 = deepcopy(page_base) + page_box1, page_base1 = get_pages() page_box1.add_transformation(op, expand=True) page_base1.merge_page(page_box1, expand=False) # Option 2: The old way - page_box2 = deepcopy(page_box) - page_base2 = deepcopy(page_base) + page_box2, page_base2 = get_pages() page_base2.merge_transformed_page(page_box2, op, expand=False) page_box2.add_transformation(op) page_base2.merge_page(page_box2) @@ -951,7 +950,7 @@ def test_text_extraction_issue_1091(): @pytest.mark.enable_socket() -def test_empyt_password_1088(): +def test_empty_password_1088(): url = "https://corpora.tika.apache.org/base/docs/govdocs1/941/941536.pdf" name = "tika-941536.pdf" stream = BytesIO(get_data_from_url(url, name=name)) diff --git a/tests/test_reader.py b/tests/test_reader.py index 83b61bc59..1e64483e5 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -853,8 +853,8 @@ def test_extract_text_hello_world(): def test_read_path(): path = Path(RESOURCE_ROOT, "crazyones.pdf") - reader = PdfReader(path) - assert len(reader.pages) == 1 + with PdfReader(path) as reader: + assert len(reader.pages) == 1 def test_read_not_binary_mode(caplog): @@ -1289,15 +1289,15 @@ def test_reader(caplog): # iss #1273 url = "https://github.com/py-pdf/pypdf/files/9464742/shiv_resume.pdf" name = "shiv_resume.pdf" - reader = PdfReader(BytesIO(get_data_from_url(url, name=name))) - assert "Previous trailer can not be read" in caplog.text - caplog.clear() - # first call requires some reparations... - reader.pages[0].extract_text() - caplog.clear() - # ...and now no more required - reader.pages[0].extract_text() - assert caplog.text == "" + with PdfReader(BytesIO(get_data_from_url(url, name=name))) as reader: + assert "Previous trailer can not be read" in caplog.text + caplog.clear() + # first call requires some reparations... + reader.pages[0].extract_text() + caplog.clear() + # ...and now no more required + reader.pages[0].extract_text() + assert caplog.text == "" @pytest.mark.enable_socket() diff --git a/tests/test_writer.py b/tests/test_writer.py index 3460a3a48..0e82c1cd4 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -1118,7 +1118,7 @@ def test_append_multiple(): @pytest.mark.samples() -def test_set_page_label(pdf_file_path): +def test_set_page_label(): src = RESOURCE_ROOT / "GeoBase_NHNC1_Data_Model_UML_EN.pdf" # File without labels reader = PdfReader(src) @@ -1154,8 +1154,9 @@ def test_set_page_label(pdf_file_path): writer.set_page_label(11, 11, "/r") writer.set_page_label(12, 13, "/R") writer.set_page_label(17, 18, "/R") - writer.write(pdf_file_path) - assert PdfReader(pdf_file_path).page_labels == expected + _, buf = writer.write(BytesIO()) + buf.seek(0) + assert PdfReader(buf).page_labels == expected writer = PdfWriter() # Same labels, different set order writer.clone_document_from_reader(reader) @@ -1165,8 +1166,9 @@ def test_set_page_label(pdf_file_path): writer.set_page_label(0, 1, "/r") writer.set_page_label(12, 13, "/R") writer.set_page_label(11, 11, "/r") - writer.write(pdf_file_path) - assert PdfReader(pdf_file_path).page_labels == expected + _, buf = writer.write(BytesIO()) + buf.seek(0) + assert PdfReader(buf).page_labels == expected # Tests labels assigned only in the middle # Tests label assigned to a range already containing labelled ranges @@ -1176,8 +1178,9 @@ def test_set_page_label(pdf_file_path): writer.set_page_label(3, 4, "/a") writer.set_page_label(5, 5, "/A") writer.set_page_label(2, 6, "/r") - writer.write(pdf_file_path) - assert PdfReader(pdf_file_path).page_labels[: len(expected)] == expected + _, buf = writer.write(BytesIO()) + buf.seek(0) + assert PdfReader(buf).page_labels[: len(expected)] == expected # Tests labels assigned inside a previously existing range expected = ["1", "2", "i", "a", "b", "A", "1", "1", "2"] @@ -1187,8 +1190,9 @@ def test_set_page_label(pdf_file_path): writer.set_page_label(2, 6, "/r") writer.set_page_label(3, 4, "/a") writer.set_page_label(5, 5, "/A") - writer.write(pdf_file_path) - assert PdfReader(pdf_file_path).page_labels[: len(expected)] == expected + _, buf = writer.write(BytesIO()) + buf.seek(0) + assert PdfReader(buf).page_labels[: len(expected)] == expected # Tests invalid user input writer = PdfWriter() @@ -1212,7 +1216,6 @@ def test_set_page_label(pdf_file_path): ): writer.set_page_label(0, 5, "/r", start=-1) - pdf_file_path.unlink() src = ( SAMPLE_ROOT / "009-pdflatex-geotopo/GeoTopo.pdf" @@ -1224,18 +1227,18 @@ def test_set_page_label(pdf_file_path): writer = PdfWriter() writer.clone_document_from_reader(reader) writer.set_page_label(2, 3, "/A") - writer.write(pdf_file_path) - assert PdfReader(pdf_file_path).page_labels[: len(expected)] == expected + _, buf = writer.write(BytesIO()) + buf.seek(0) + assert PdfReader(buf).page_labels[: len(expected)] == expected # Tests replacing existing labels expected = ["A", "B", "1", "1", "2"] writer = PdfWriter() writer.clone_document_from_reader(reader) writer.set_page_label(0, 1, "/A") - writer.write(pdf_file_path) - assert PdfReader(pdf_file_path).page_labels[: len(expected)] == expected - - pdf_file_path.unlink() + _, buf = writer.write(BytesIO()) + buf.seek(0) + assert PdfReader(buf).page_labels[: len(expected)] == expected # Tests prefix and start. src = RESOURCE_ROOT / "issue-604.pdf" # File without page labels @@ -1250,7 +1253,7 @@ def test_set_page_label(pdf_file_path): writer.set_page_label(11, 21, "/D", prefix="PAP-") writer.set_page_label(22, 30, "/D", prefix="FOLL-") writer.set_page_label(31, 39, "/D", prefix="HURT-") - writer.write(pdf_file_path) + writer.write(BytesIO()) @pytest.mark.enable_socket() @@ -1452,7 +1455,6 @@ def test_named_dest_page_number(): def test_update_form_fields(tmp_path): - write_data_here = tmp_path / "out.pdf" writer = PdfWriter(clone_from=RESOURCE_ROOT / "FormTestFromOo.pdf") writer.update_page_form_field_values( writer.pages[0], @@ -1476,8 +1478,8 @@ def test_update_form_fields(tmp_path): auto_regenerate=False, ) - writer.write(write_data_here) - reader = PdfReader(write_data_here) + _, buf = writer.write(BytesIO()) + reader = PdfReader(buf) flds = reader.get_fields() assert flds["CheckBox1"]["/V"] == "/Yes" assert flds["CheckBox1"].indirect_reference.get_object()["/AS"] == "/Yes" @@ -1519,8 +1521,6 @@ def test_update_form_fields(tmp_path): auto_regenerate=False, ) - Path(write_data_here).unlink() - @pytest.mark.enable_socket() def test_update_form_fields2():