1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
import tarfile
import arpy
from debian import deb822
from dedup.compression import decompress
from dedup.hashing import hash_file
class MultiHash:
def __init__(self, *hashes):
self.hashes = hashes
def update(self, data):
for hasher in self.hashes:
hasher.update(data)
def get_tar_hashes(tar, hash_functions):
"""Given a TarFile read all regular files and compute all of the given hash
functions on each file.
@type tar: tarfile.TarFile
@param hash_functions: a sequence of parameter-less functions each creating a
new hashlib-like object
@rtype: gen((str, int, {str: str}}
@returns: an iterable of (filename, filesize, hashes) tuples where
hashes is a dict mapping hash function names to hash values
"""
for elem in tar:
if not elem.isreg(): # excludes hard links as well
continue
hasher = MultiHash(*[func() for func in hash_functions])
hash_file(hasher, tar.extractfile(elem))
hashes = {}
for hashobj in hasher.hashes:
hashvalue = hashobj.hexdigest()
if hashvalue:
hashes[hashobj.name] = hashvalue
yield (elem.name, elem.size, hashes)
def opentar(filelike):
return tarfile.open(fileobj=filelike, mode="r|", encoding="utf8",
errors="surrogateescape")
class DebExtractor:
"Base class for extracting desired features from a Debian package."
def __init__(self):
self.arstate = "start"
def process(self, filelike):
"""Process a Debian package.
@param filelike: is a file-like object containing the contents of the
Debian packge and can be read once without seeks.
"""
af = arpy.Archive(fileobj=filelike)
for member in af:
self.handle_ar_member(member)
self.handle_ar_end()
def handle_ar_member(self, arfiledata: arpy.ArchiveFileData) -> None:
"""Handle an ar archive member of the Debian package.
If you replace this method, you must also replace handle_ar_end and
none of the methods handle_debversion, handle_control_tar or
handle_data_tar are called.
"""
name = arfiledata.header.name
if self.arstate == "start":
if name != b"debian-binary":
raise ValueError("debian-binary not found")
version = arfiledata.read()
self.handle_debversion(version)
if not version.startswith(b"2."):
raise ValueError("debian version not recognized")
self.arstate = "version"
elif self.arstate == "version":
if name.startswith(b"control.tar"):
filelike = decompress(arfiledata, name[11:].decode("ascii"))
self.handle_control_tar(opentar(filelike))
self.arstate = "control"
elif not name.startswith(b"_"):
raise ValueError("unexpected ar member %r" % name)
elif self.arstate == "control":
if name.startswith(b"data.tar"):
filelike = decompress(arfiledata, name[8:].decode("ascii"))
self.handle_data_tar(opentar(filelike))
self.arstate = "data"
elif not name.startswith(b"_"):
raise ValueError("unexpected ar member %r" % name)
else:
assert self.arstate == "data"
def handle_ar_end(self):
"Handle the end of the ar archive of the Debian package."
if self.arstate != "data":
raise ValueError("data.tar not found")
def handle_debversion(self, version):
"""Handle the debian-binary member of the Debian package.
@type version: bytes
@param version: The full contents of the ar member.
"""
def handle_control_tar(self, tarfileobj):
"""Handle the control.tar member of the Debian package.
If you replace this method, none of handle_control_member,
handle_control_info or handle_control_end are called.
@type tarfileobj: tarfile.TarFile
@param tarfile: is opened for streaming reads
"""
controlseen = False
for elem in tarfileobj:
if elem.isreg():
name = elem.name
if name.startswith("./"):
name = name[2:]
content = tarfileobj.extractfile(elem).read()
self.handle_control_member(name, content)
if name == "control":
self.handle_control_info(deb822.Packages(content))
controlseen = True
elif not (elem.isdir() and elem.name == "."):
raise ValueError("invalid non-file %r found in control.tar" %
elem.name)
if not controlseen:
raise ValueError("control missing from control.tar")
self.handle_control_end()
def handle_control_member(self, name, content):
"""Handle a file member of the control.tar member of the Debian package.
@type name: str
@param name: is the plain member name
@type content: bytes
"""
def handle_control_info(self, info):
"""Handle the control member of the control.tar member of the Debian
package.
@type info: deb822.Packages
"""
def handle_control_end(self):
"Handle the end of the control.tar member of the Debian package."
def handle_data_tar(self, tarfileobj):
"""Handle the data.tar member of the Debian package.
@type tarfileobj: tarfile.TarFile
@param tarfile: is opened for streaming reads
"""
|