]> git.phdru.name Git - mimedecode.git/blob - mimedecode/mimedecode.py
Version 3.2.0: Copy `mailcap.py` from Python 3.12
[mimedecode.git] / mimedecode / mimedecode.py
1 """Decode MIME message"""
2
3 import os
4 import shutil
5 import subprocess
6 import sys
7
8 from .__version__ import __version__, __copyright__
9
10 if sys.version_info[0] >= 3:
11     # Replace email.message._formatparam with _formatparam from Python 2.7
12     # to avoid re-encoding non-ascii params.
13     from mimedecode import formatparam_27  # noqa: F401: Imported for its side effect
14
15 me = os.path.basename(sys.argv[0])
16
17
18 def output_headers(msg):
19     unix_from = msg.get_unixfrom()
20     if unix_from:
21         output(unix_from)
22         output(os.linesep)
23     for key, value in msg.items():
24         output(key)
25         output(": ")
26         value = value.split(';', 1)
27         output(value[0])
28         if len(value) == 2:
29             output(";")
30             output(_decode_header(value[1], strip=False))
31         output(os.linesep)
32     output(os.linesep)  # End of headers
33
34
35 def recode_if_needed(s, charset):
36     if bytes is str:  # Python2
37         if isinstance(s, bytes) and \
38                 charset and charset.lower() != g.default_encoding:
39             s = s.decode(charset, "replace").\
40                 encode(g.default_encoding, "replace")
41     else:  # Python3
42         if isinstance(s, bytes):
43             s = s.decode(charset, "replace")
44     return s
45
46
47 def _decode_header(s, strip=True):
48     """Return a decoded string according to RFC 2047.
49     NOTE: This is almost the same as email.Utils.decode.
50     """
51     import email.header
52
53     L = email.header.decode_header(s)
54     if not isinstance(L, list):
55         # s wasn't decoded
56         return s
57
58     rtn = []
59     for atom, charset in L:
60         atom = recode_if_needed(atom, charset or g.default_encoding)
61         if strip:
62             atom = atom.strip()
63         rtn.append(atom)
64
65     # Now that we've decoded everything, we just need to join all the parts
66     # together into the final string.
67     return ' '.join(rtn)
68
69
70 def decode_header(msg, header):
71     "Decode mail header (if exists) and put it back, if it was encoded"
72
73     if header in msg:
74         value = msg[header]
75         new_value = _decode_header(value)
76         if new_value != value:  # do not bother to touch msg if not changed
77             set_header(msg, header, new_value)
78
79
80 def decode_header_param(msg, header, param):
81     """Decode mail header's parameter
82
83     Decode mail header's parameter (if exists)
84     and put it back if it was encoded.
85     """
86     if header in msg:
87         value = msg.get_param(param, header=header)
88         if value:
89             if isinstance(value, tuple):
90                 new_value = recode_if_needed(value[2], value[0])
91             else:
92                 new_value = _decode_header(value)
93             if new_value != value:  # do not bother to touch msg if not changed
94                 msg.set_param(param, new_value, header)
95
96
97 def _get_exceptions(list):
98     return [x[1:].lower() for x in list[1:] if x[0] == '-']
99
100
101 def _decode_headers_params(msg, header, decode_all_params, param_list):
102     if decode_all_params:
103         params = msg.get_params(header=header)
104         if params:
105             for param, value in params:
106                 if param not in param_list:
107                     decode_header_param(msg, header, param)
108     else:
109         for param in param_list:
110             decode_header_param(msg, header, param)
111
112
113 def _remove_headers_params(msg, header, remove_all_params, param_list):
114     if remove_all_params:
115         params = msg.get_params(header=header)
116         if params:
117             if param_list:
118                 for param, value in params:
119                     if param not in param_list:
120                         msg.del_param(param, header)
121             else:
122                 value = msg[header]
123                 if value is None:  # No such header
124                     return
125                 if ';' not in value:  # There are no parameters
126                     return
127                 del msg[header]  # Delete all such headers
128                 # Get the value without parameters and set it back
129                 msg[header] = value.split(';')[0].strip()
130     else:
131         for param in param_list:
132             msg.del_param(param, header)
133
134
135 def decode_headers(msg):
136     "Decode message headers according to global options"
137
138     for header_list in g.remove_headers:
139         header_list = header_list.split(',')
140         if header_list[0] == '*':  # Remove all headers except listed
141             header_list = _get_exceptions(header_list)
142             for header in msg.keys():
143                 if header.lower() not in header_list:
144                     del msg[header]
145         else:  # Remove listed headers
146             for header in header_list:
147                 del msg[header]
148
149     for header_list, param_list in g.remove_headers_params:
150         header_list = header_list.split(',')
151         param_list = param_list.split(',')
152         # Remove all params except listed.
153         remove_all_params = param_list[0] == '*'
154         if remove_all_params:
155             param_list = _get_exceptions(param_list)
156         if header_list[0] == '*':  # Remove for all headers except listed
157             header_list = _get_exceptions(header_list)
158             for header in msg.keys():
159                 if header.lower() not in header_list:
160                     _remove_headers_params(
161                         msg, header, remove_all_params, param_list)
162         else:  # Decode for listed headers
163             for header in header_list:
164                 _remove_headers_params(
165                     msg, header, remove_all_params, param_list)
166
167     for header_list in g.decode_headers:
168         header_list = header_list.split(',')
169         if header_list[0] == '*':  # Decode all headers except listed
170             header_list = _get_exceptions(header_list)
171             for header in msg.keys():
172                 if header.lower() not in header_list:
173                     decode_header(msg, header)
174         else:  # Decode listed headers
175             for header in header_list:
176                 decode_header(msg, header)
177
178     for header_list, param_list in g.decode_header_params:
179         header_list = header_list.split(',')
180         param_list = param_list.split(',')
181         # Decode all params except listed.
182         decode_all_params = param_list[0] == '*'
183         if decode_all_params:
184             param_list = _get_exceptions(param_list)
185         if header_list[0] == '*':  # Decode for all headers except listed
186             header_list = _get_exceptions(header_list)
187             for header in msg.keys():
188                 if header.lower() not in header_list:
189                     _decode_headers_params(
190                         msg, header, decode_all_params, param_list)
191         else:  # Decode for listed headers
192             for header in header_list:
193                 _decode_headers_params(
194                     msg, header, decode_all_params, param_list)
195
196
197 def set_header(msg, header, value):
198     "Replace header"
199
200     if header in msg:
201         msg.replace_header(header, value)
202     else:
203         msg[header] = value
204
205
206 def set_content_type(msg, newtype, charset=None):
207     msg.set_type(newtype)
208
209     if charset:
210         msg.set_param("charset", charset, "Content-Type")
211
212
213 caps = None  # Globally stored mailcap database; initialized only if needed
214
215
216 def decode_body(msg, s):
217     "Decode body to plain text using first copiousoutput filter from mailcap"
218
219     try:
220         import mailcap
221     except ImportError:  # Python 3.13
222         from mimedecode import mailcap_312 as mailcap
223     import tempfile
224
225     global caps
226     if caps is None:
227         caps = mailcap.getcaps()
228
229     content_type = msg.get_content_type()
230     if content_type.startswith('text/'):
231         charset = msg.get_content_charset()
232     else:
233         charset = None
234     tmpfile = tempfile.NamedTemporaryFile()
235     command = None
236
237     entries = mailcap.lookup(caps, content_type, "view")
238     for entry in entries:
239         if 'copiousoutput' in entry:
240             if 'test' in entry:
241                 test = mailcap.subst(entry['test'], content_type, tmpfile.name)
242                 if test and os.system(test) != 0:
243                     continue
244             command = mailcap.subst(entry["view"], content_type, tmpfile.name)
245             break
246
247     if not command:
248         return s
249
250     if charset and bytes is not str and isinstance(s, bytes):  # Python3
251         s = s.decode(charset, "replace")
252     if not isinstance(s, bytes):
253         s = s.encode(g.default_encoding, "replace")
254     tmpfile.write(s)
255     tmpfile.flush()
256
257     pipe = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
258     new_s = pipe.stdout.read()
259     pipe.stdout.close()
260     if pipe.wait() == 0:  # result=0, Ok
261         s = new_s
262         if bytes is not str and isinstance(s, bytes):  # Python3
263             s = s.decode(g.default_encoding, "replace")
264         if charset and not isinstance(s, bytes):
265             s = s.encode(charset, "replace")
266         set_content_type(msg, "text/plain")
267         msg["X-MIME-Autoconverted"] = \
268             "from %s to text/plain by %s id %s" \
269             % (content_type, g.host_name, command.split()[0])
270     else:
271         msg["X-MIME-Autoconverted"] = \
272             "failed conversion from %s to text/plain by %s id %s" \
273             % (content_type, g.host_name, command.split()[0])
274     tmpfile.close()  # Will be removed on close
275
276     return s
277
278
279 def recode_charset(msg, s):
280     "Recode charset of the message to the default charset"
281
282     save_charset = charset = msg.get_content_charset()
283     if charset and charset.lower() != g.default_encoding:
284         s = recode_if_needed(s, charset)
285         content_type = msg.get_content_type()
286         set_content_type(msg, content_type, g.default_encoding)
287         msg["X-MIME-Autoconverted"] = \
288             "from %s to %s by %s id %s" \
289             % (save_charset, g.default_encoding, g.host_name, me)
290     return s
291
292
293 def totext(msg, instring):
294     "Convert instring content to text"
295
296     # Decode body and recode charset
297     s = decode_body(msg, instring)
298     if g.recode_charset:
299         s = recode_charset(msg, s)
300
301     output_headers(msg)
302     output(s)
303     return s
304
305
306 mimetypes = None
307
308
309 def _guess_extension(ctype):
310     global mimetypes
311     if mimetypes is None:
312         import mimetypes
313         mimetypes.init()
314         user_mime_type = os.path.expanduser('~/.mime.types')
315         if os.path.exists(user_mime_type):
316             mimetypes._db.read(user_mime_type)
317     return mimetypes.guess_extension(ctype)
318
319
320 def _save_message(msg, outstring, save_headers=False, save_body=False):
321     for header, param in (
322         ("Content-Disposition", "filename"),
323         ("Content-Type", "name"),
324     ):
325         fname = msg.get_param(param, header=header)
326         if fname:
327             if isinstance(fname, tuple):
328                 fname = fname[2]  # Do not recode if it isn't recoded yet
329             try:
330                     for forbidden in chr(0), '/', '\\':
331                         if forbidden in fname:
332                             raise ValueError
333             except ValueError:
334                 continue
335             fname = '-' + fname
336             break
337     else:
338         fname = ''
339     g.save_counter += 1
340     fname = str(g.save_counter) + fname
341     if '.' not in fname:
342         ext = _guess_extension(msg.get_content_type())
343         if ext:
344             fname += ext
345
346     global output
347     save_output = output
348     outfile = open_output_file(fname)
349
350     def _output_bytes(s):
351         if not isinstance(s, bytes):
352             s = s.encode(g.default_encoding, "replace")
353         outfile.write(s)
354
355     output = _output_bytes
356     if save_headers:
357         output_headers(msg)
358     if save_body:
359         output(outstring)
360     outfile.close()
361     output = save_output
362
363
364 def decode_part(msg):
365     "Decode one part of the message"
366
367     decode_headers(msg)
368
369     # Test all mask lists and find what to do with this content type
370     masks = []
371     ctype = msg.get_content_type()
372     if ctype:
373         masks.append(ctype)
374         mtype = ctype.split('/')[0]
375         masks.append(mtype + '/*')
376     masks.append('*/*')
377
378     left_binary = False
379     for content_type in masks:
380         if content_type in g.totext_mask or \
381            content_type in g.decoded_binary_mask:
382             break
383         elif content_type in g.binary_mask:
384             left_binary = True
385             break
386         elif content_type in g.fully_ignore_mask:
387             return
388
389     encoding = msg["Content-Transfer-Encoding"]
390     if left_binary or encoding in (None, '', '7bit', '8bit', 'binary'):
391         outstring = msg.get_payload()
392     else:  # Decode from transfer ecoding to text or binary form
393         outstring = msg.get_payload(decode=1)
394         set_header(msg, "Content-Transfer-Encoding", "8bit")
395         msg["X-MIME-Autoconverted"] = \
396             "from %s to 8bit by %s id %s" % (encoding, g.host_name, me)
397
398     for content_type in masks:
399         if content_type in g.totext_mask:
400             outstring = totext(msg, outstring)
401             break
402         elif content_type in g.binary_mask or \
403                 content_type in g.decoded_binary_mask:
404             output_headers(msg)
405             output(outstring)
406             break
407         elif content_type in g.ignore_mask:
408             output_headers(msg)
409             output("%sMessage body of type %s skipped.%s"
410                    % (os.linesep, ctype, os.linesep))
411             break
412         elif content_type in g.error_mask:
413             break
414     else:
415         # Neither content type nor masks were listed - decode by default
416         outstring = totext(msg, outstring)
417
418     for content_type in masks:
419         if content_type in g.save_headers_mask:
420             _save_message(msg, outstring, save_headers=True, save_body=False)
421         if content_type in g.save_body_mask:
422             _save_message(msg, outstring, save_headers=False, save_body=True)
423         if content_type in g.save_message_mask:
424             _save_message(msg, outstring, save_headers=True, save_body=True)
425
426     for content_type in masks:
427         if content_type in g.error_mask:
428             raise ValueError("content type %s prohibited" % ctype)
429
430
431 def decode_multipart(msg):
432     "Decode multipart"
433
434     decode_headers(msg)
435     boundary = msg.get_boundary()
436
437     masks = []
438     ctype = msg.get_content_type()
439     if ctype:
440         masks.append(ctype)
441         mtype = ctype.split('/')[0]
442         masks.append(mtype + '/*')
443     masks.append('*/*')
444
445     for content_type in masks:
446         if content_type in g.fully_ignore_mask:
447             return
448         elif content_type in g.ignore_mask:
449             output_headers(msg)
450             output("%sMessage body of type %s skipped.%s"
451                    % (os.linesep, ctype, os.linesep))
452             if boundary:
453                 output("%s--%s--%s" % (os.linesep, boundary, os.linesep))
454             return
455
456     for content_type in masks:
457         if content_type in g.save_body_mask or \
458                 content_type in g.save_message_mask:
459             _out_l = []
460             first_subpart = True
461             for subpart in msg.get_payload():
462                 if first_subpart:
463                     first_subpart = False
464                 else:
465                     _out_l.append(os.linesep)
466                 _out_l.append("--%s%s" % (boundary, os.linesep))
467                 _out_l.append(subpart.as_string())
468             _out_l.append("%s--%s--%s" % (os.linesep, boundary, os.linesep))
469             outstring = ''.join(_out_l)
470             break
471     else:
472         outstring = None
473
474     for content_type in masks:
475         if content_type in g.save_headers_mask:
476             _save_message(msg, outstring, save_headers=True, save_body=False)
477         if content_type in g.save_body_mask:
478             _save_message(msg, outstring, save_headers=False, save_body=True)
479         if content_type in g.save_message_mask:
480             _save_message(msg, outstring, save_headers=True, save_body=True)
481
482     for content_type in masks:
483         if content_type in g.error_mask:
484             raise ValueError("content type %s prohibited" % ctype)
485
486     output_headers(msg)
487
488     # Preserve the first part, it is probably not a RFC822-message.
489     if msg.preamble:
490         # Usually it is just a few lines of text (MIME warning).
491         output(msg.preamble)
492     if msg.preamble is not None:
493         output(os.linesep)
494
495     first_subpart = True
496     for subpart in msg.get_payload():
497         if boundary:
498             if first_subpart:
499                 first_subpart = False
500             else:
501                 output(os.linesep)
502             output("--%s%s" % (boundary, os.linesep))
503
504         # Recursively decode all parts of the subpart
505         decode_message(subpart)
506
507     if boundary:
508         output("%s--%s--%s" % (os.linesep, boundary, os.linesep))
509
510     if msg.epilogue:
511         output(msg.epilogue)
512
513
514 def decode_message(msg):
515     "Decode message"
516
517     if msg.is_multipart():
518         decode_multipart(msg)
519     elif len(msg):  # Simple one-part message (there are headers) - decode it
520         decode_part(msg)
521     else:  # Not a message, just text - copy it literally
522         output(msg.as_string())
523
524
525 def open_output_file(filename):
526     fullpath = os.path.abspath(os.path.join(g.destination_dir, filename))
527     full_dir = os.path.dirname(fullpath)
528     create = not os.path.isdir(full_dir)
529     if create:
530         os.makedirs(full_dir)
531     try:
532         return open(fullpath, 'wb')
533     except Exception:
534         if create:
535             shutil.rmtree(full_dir)
536         raise