summaryrefslogtreecommitdiff
path: root/dedup/image.py
blob: a41752822ace8e7c9c09520a00624c26d08b37e9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import io
import struct

import PIL.Image

from .hashing import HashlibLike

class ImageHash:
    """A hash on the contents of an image data type supported by PIL. This
    disregards mode, depth and meta information. Note that due to limitations
    in PIL and the image format (interlacing) the full contents are stored and
    decoded in hexdigest."""
    maxsize = 1024 * 1024 * 32
    # max memory usage is about 5 * maxpixels in bytes
    maxpixels = 1024 * 1024 * 32
    name_prefix: str

    def __init__(self, hashobj: HashlibLike) -> None:
        """
        @param hashobj: a hashlib-like object
        """
        self.hashobj = hashobj
        self.imagedetected = False
        self.content = io.BytesIO()

    def detect(self) -> bool:
        raise NotImplementedError

    def update(self, data: bytes) -> None:
        self.content.write(data)
        if self.content.tell() > self.maxsize:
            raise ValueError("maximum image size exceeded")
        if not self.imagedetected:
            self.imagedetected = self.detect()

    def copy(self) -> "ImageHash":
        new = self.__class__(self.hashobj.copy())
        new.imagedetected = self.imagedetected
        new.content = io.BytesIO(self.content.getvalue())
        return new

    def digest(self) -> bytes:
        raise ValueError("an ImageHash cannot produce a raw digest")

    def hexdigest(self) -> str:
        if not self.imagedetected:
            raise ValueError("not a image")
        hashobj = self.hashobj.copy()
        pos = self.content.tell()
        try:
            self.content.seek(0)
            try:
                img = PIL.Image.open(self.content)
            except IOError:
                raise ValueError("broken header")
            width, height = img.size
            pack = lambda elem: struct.pack("BBBB", *elem)
            # special casing easy modes reduces memory usage
            if img.mode == "L":
                pack = lambda elem: struct.pack("BBBB", elem, elem, elem, 255)
            elif img.mode == "RGB":
                pack = lambda elem: struct.pack("BBBB", *(elem + (255,)))
            elif img.mode != "RGBA":
                try:
                    img = img.convert("RGBA")
                except (SyntaxError, IndexError, IOError):
                    # crazy stuff from PIL
                    raise ValueError("error reading image")
            try:
                for elem in img.getdata():
                    hashobj.update(pack(elem))
            except (SyntaxError, IndexError, IOError): # crazy stuff from PIL
                raise ValueError("error reading image")
        finally:
            self.content.seek(pos)
        return "%s%8.8x%8.8x" % (hashobj.hexdigest(), width, height)

    @property
    def name(self) -> str:
        return self.name_prefix + self.hashobj.name


class PNGHash(ImageHash):
    """A hash on the contents of a PNG image."""
    name_prefix = "png_"

    def detect(self) -> bool:
        if self.content.tell() < 33: # header + IHDR
            return False
        curvalue = self.content.getvalue()
        if curvalue.startswith(b"\x89PNG\r\n\x1a\n\0\0\0\x0dIHDR"):
            width, height = struct.unpack(">II", curvalue[16:24])
            if width * height > self.maxpixels:
                raise ValueError("maximum image pixels exceeded")
            return True
        raise ValueError("not a png image")

class GIFHash(ImageHash):
    """A hash on the contents of the first frame of a GIF image."""
    name_prefix = "gif_"

    def detect(self) -> bool:
        if self.content.tell() < 10: # magic + logical dimension
            return False
        curvalue = self.content.getvalue()
        if curvalue.startswith((b"GIF87a", b"GIF89a")):
            width, height = struct.unpack("<HH", curvalue[6:10])
            if width * height > self.maxpixels:
                raise ValueError("maximum image pixels exceeded")
            return True
        raise ValueError("not a png image")