utils.py 9.13 KB
Newer Older
Jerome Mariette's avatar
Jerome Mariette committed
1
#
Jerome Mariette's avatar
Jerome Mariette committed
2
# Copyright (C) 2015 INRA
Jerome Mariette's avatar
Jerome Mariette committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

18
import re
Jerome Mariette's avatar
Jerome Mariette committed
19
import sys
20
21
import smtplib
import socket
22
import math
Jerome Mariette's avatar
Jerome Mariette committed
23
import shutil
24
25
26
27
28
29
30
31

try:
    import DNS
    ServerError = DNS.ServerError
except:
    DNS = None
    class ServerError(Exception): pass

Jerome Mariette's avatar
Jerome Mariette committed
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def robust_rmtree(path, logger=None, max_retries=6):
    """Robustly tries to delete paths.
    Retries several times (with increasing delays) if an OSError
    occurs.  If the final attempt fails, the Exception is propagated
    to the caller.
    """
    dt = 1
    for i in range(max_retries):
        try:
            shutil.rmtree(path)
            return
        except OSError:
            if logger:
                logger.info('Unable to remove path: %s' % path)
                logger.info('Retrying after %d seconds' % dt)
            time.sleep(dt)
            dt *= 2

    # Final attempt, pass any Exceptions up to caller.
    shutil.rmtree(path)

Jerome Mariette's avatar
Jerome Mariette committed
53
def display_error_message(msg):
Floreal Cabanettes's avatar
Floreal Cabanettes committed
54
    sys.stderr.write("\033[91mError: "+msg+"\033[0m\n")
Jerome Mariette's avatar
Jerome Mariette committed
55
56
57
58
59
60
    sys.exit(1)

def display_info_message(msg, with_exit=False):
    sys.stderr.write("\033[93mInfo: "+msg+"\n\033[0m")
    if with_exit: sys.exit(1)

61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def which(program):
    """
    Return if the asked program exist in the user path
      @param options : the options asked by the user
    """
    import os
    def is_exe(fpath):
        return os.path.exists(fpath) and os.access(fpath, os.X_OK)
    fpath, fname = os.path.split(program)
    if fpath:
        if is_exe(program):
            return program
    else:
        for path in os.environ["PATH"].split(os.pathsep):
            exe_file = os.path.join(path, program)
            if is_exe(exe_file):
                return exe_file
    return None

Jerome Mariette's avatar
Jerome Mariette committed
80
def get_file_base (file):
Jerome Mariette's avatar
Jerome Mariette committed
81
    """
Jerome Mariette's avatar
Jerome Mariette committed
82
83
    Return the file base
      @param file: input file
Jerome Mariette's avatar
Jerome Mariette committed
84
    """
Jerome Mariette's avatar
Jerome Mariette committed
85
86
    # First import required libraries
    import os
Jerome Mariette's avatar
Jerome Mariette committed
87
    
Jerome Mariette's avatar
Jerome Mariette committed
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
    file_base = os.path.splitext(os.path.basename(file))[0]
    if os.path.splitext(file)[1] == ".gz":
        file_base = os.path.splitext(file_base)[0]
    return file_base

def get_nb_string(value, length=6):  
    """
    Return a string of n caracters.
      @param value: the value to get representation
      @param length: the final length required
    """
    zeros = ""
    s_value = str(value)
    for i in range(length - len(s_value)):
        zeros += "0";
    s_value = zeros + s_value
    return s_value
Jerome Mariette's avatar
Jerome Mariette committed
105

106
107
def get_nb_octet(size):
    """
jmariette's avatar
jmariette committed
108
    Return the number of Byte : value has to be formated like this: 5MB, 20GB ...
109
    """
jmariette's avatar
jmariette committed
110
    octets_link = ["Bytes", "kB", "MB", "GB", "TB", "PB", "EB", "ZB"]
111
    if size.endswith("bytes"):
jmariette's avatar
jmariette committed
112
        unit = "Bytes"
113
114
115
116
        isize = size[:len(size)-5]
    else:
        unit = size[len(size)-2:len(size)]
        isize = size[:len(size)-2]
117
118
    pow_val = int(octets_link.index(unit)) * 10
    val = pow(2, pow_val)
119
    nb_octet = float(isize) * val
120
121
122
123
    return nb_octet

def get_octet_string_representation(size):
    """
jmariette's avatar
jmariette committed
124
    Return the string representation of a Byte
125
    """
jmariette's avatar
jmariette committed
126
    octets_link = ["Bytes", "kB", "MB", "GB", "TB", "PB", "EB", "ZB"]
127
128
129
130
131
132
133
134
135
136
137
138
    p = int(math.ceil(float(len(str(size)))/float(3) - float(1)))
    pow_needed = p * 10
    pow_needed = pow(2, pow_needed)
    value = str(float(size)/float(pow_needed))
    tmp = value.split(".")
    value = tmp[0] + "." + tmp[1][:2]
    try:
        value = value + " " + octets_link[p]
    except:
        raise TypeError("In core.common:project_id unexpected input value for size: " + str(size))
    return str(value)

Jerome Mariette's avatar
Jerome Mariette committed
139
140
141
142
143
144
145
146
147
148
149
150
def get_argument_pattern( list, start_number=1 ):
    """
    Return the argument pattern for arguments files. Ex : with 3 element in list this function returns ["${1} ${2} ${3}", 4].
      @param list : the list of files.
      @param start_number : number of the first argument.
    """
    arg_pattern = ""
    next_number = start_number
    for elt in list:
        arg_pattern += ' ${' + str(next_number) + '}'
        next_number += 1
        
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
    return [arg_pattern, next_number]

# All we are really doing is comparing the input string to one
# gigantic regular expression.  But building that regexp, and
# ensuring its correctness, is made much easier by assembling it
# from the "tokens" defined by the RFC.  Each of these tokens is
# tested in the accompanying unit test file.
#
# The section of RFC 2822 from which each pattern component is
# derived is given in an accompanying comment.
#
# (To make things simple, every string below is given as 'raw',
# even when it's not strictly necessary.  This way we don't forget
# when it is necessary.)
#
WSP = r'[ \t]'                                       # see 2.2.2. Structured Header Field Bodies
CRLF = r'(?:\r\n)'                                   # see 2.2.3. Long Header Fields
NO_WS_CTL = r'\x01-\x08\x0b\x0c\x0f-\x1f\x7f'        # see 3.2.1. Primitive Tokens
QUOTED_PAIR = r'(?:\\.)'                             # see 3.2.2. Quoted characters
FWS = r'(?:(?:' + WSP + r'*' + CRLF + r')?' + \
            WSP + r'+)'                                    # see 3.2.3. Folding white space and comments
CTEXT = r'[' + NO_WS_CTL + \
                r'\x21-\x27\x2a-\x5b\x5d-\x7e]'              # see 3.2.3
CCONTENT = r'(?:' + CTEXT + r'|' + \
                     QUOTED_PAIR + r')'                        # see 3.2.3 (NB: The RFC includes COMMENT here
                                                                                                         # as well, but that would be circular.)
COMMENT = r'\((?:' + FWS + r'?' + CCONTENT + \
                    r')*' + FWS + r'?\)'                       # see 3.2.3
CFWS = r'(?:' + FWS + r'?' + COMMENT + ')*(?:' + \
             FWS + '?' + COMMENT + '|' + FWS + ')'         # see 3.2.3
ATEXT = r'[\w!#$%&\'\*\+\-/=\?\^`\{\|\}~]'           # see 3.2.4. Atom
ATOM = CFWS + r'?' + ATEXT + r'+' + CFWS + r'?'      # see 3.2.4
DOT_ATOM_TEXT = ATEXT + r'+(?:\.' + ATEXT + r'+)*'   # see 3.2.4
DOT_ATOM = CFWS + r'?' + DOT_ATOM_TEXT + CFWS + r'?' # see 3.2.4
QTEXT = r'[' + NO_WS_CTL + \
                r'\x21\x23-\x5b\x5d-\x7e]'                   # see 3.2.5. Quoted strings
QCONTENT = r'(?:' + QTEXT + r'|' + \
                     QUOTED_PAIR + r')'                        # see 3.2.5
QUOTED_STRING = CFWS + r'?' + r'"(?:' + FWS + \
                                r'?' + QCONTENT + r')*' + FWS + \
                                r'?' + r'"' + CFWS + r'?'
LOCAL_PART = r'(?:' + DOT_ATOM + r'|' + \
                         QUOTED_STRING + r')'                    # see 3.4.1. Addr-spec specification
DTEXT = r'[' + NO_WS_CTL + r'\x21-\x5a\x5e-\x7e]'    # see 3.4.1
DCONTENT = r'(?:' + DTEXT + r'|' + \
                     QUOTED_PAIR + r')'                        # see 3.4.1
DOMAIN_LITERAL = CFWS + r'?' + r'\[' + \
                                 r'(?:' + FWS + r'?' + DCONTENT + \
                                 r')*' + FWS + r'?\]' + CFWS + r'?'  # see 3.4.1
DOMAIN = r'(?:' + DOT_ATOM + r'|' + \
                 DOMAIN_LITERAL + r')'                       # see 3.4.1
ADDR_SPEC = LOCAL_PART + r'@' + DOMAIN               # see 3.4.1

# A valid address will match exactly the 3.4.1 addr-spec.
VALID_ADDRESS_REGEXP = '^' + ADDR_SPEC + '$'

def validate_email(email, check_mx=False,verify=False):

    """Indicate whether the given string is a valid email address
    according to the 'addr-spec' portion of RFC 2822 (see section
    3.4.1).  Parts of the spec that are marked obsolete are *not*
    included in this test, and certain arcane constructions that
    depend on circular definitions in the spec may not pass, but in
    general this should correctly identify any email address likely
    to be in use as of 2011."""
    try:
        assert re.match(VALID_ADDRESS_REGEXP, email) is not None
        check_mx |= verify
        if check_mx:
            if not DNS: raise Exception('For check the mx records or check if the email exists you must have installed pyDNS python package')
            DNS.DiscoverNameServers()
            hostname = email[email.find('@')+1:]
            mx_hosts = DNS.mxlookup(hostname)
            for mx in mx_hosts:
                try:
                    smtp = smtplib.SMTP()
                    smtp.connect(mx[1])
                    if not verify: return True
                    status, _ = smtp.helo()
                    if status != 250: continue
                    smtp.mail('')
                    status, _ = smtp.rcpt(email)
                    if status != 250: return False
                    break
                except smtplib.SMTPServerDisconnected: #Server not permits verify user
                    break
                except smtplib.SMTPConnectError:
                    continue
    except (AssertionError, ServerError): 
        return False
jmariette's avatar
jmariette committed
241
    return True