jflow_server.py 15.1 KB
Newer Older
Jerome Mariette's avatar
Jerome Mariette committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#
# Copyright (C) 2012 INRA
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import cherrypy
Jerome Mariette's avatar
Jerome Mariette committed
19
20
import cgi
import tempfile
Jerome Mariette's avatar
Jerome Mariette committed
21
22
23
24
25
import json
import sys
from functools import wraps
import time
import os
26
import argparse
Jerome Mariette's avatar
Jerome Mariette committed
27
import logging
Jerome Mariette's avatar
Jerome Mariette committed
28

Jerome Mariette's avatar
Jerome Mariette committed
29
30
from argparse import ArgumentTypeError

Jerome Mariette's avatar
Jerome Mariette committed
31
32
33
34
35
try:
    import _preamble
except ImportError:
    sys.exc_clear()

36
import jflow
Jerome Mariette's avatar
Jerome Mariette committed
37
from jflow.workflows_manager import WorkflowsManager
38
from jflow.config_reader import JFlowConfigReader
Jerome Mariette's avatar
Jerome Mariette committed
39
from jflow.workflow import Workflow
Jerome Mariette's avatar
Jerome Mariette committed
40
from workflows.types import *
Jerome Mariette's avatar
Jerome Mariette committed
41
42
43
import jflow.utils as utils
from cctools.util import time_format

44
# function in charge to upload large files
45
class UploadFieldStorage(cgi.FieldStorage):
Jerome Mariette's avatar
Jerome Mariette committed
46
47
48
49
    """Our version uses a named temporary file instead of the default
    non-named file; keeping it visibile (named), allows us to create a
    2nd link after the upload is done, thus avoiding the overhead of
    making a copy to the destination filename."""
50
51
52
53
54

    def get_tmp_directory(self):
        jflowconf = JFlowConfigReader()
        return jflowconf.get_tmp_directory()

Jerome Mariette's avatar
Jerome Mariette committed
55
    def make_file(self, binary=None):
Frédéric Escudié's avatar
Frédéric Escudié committed
56
        tmp_folder = self.get_tmp_directory()
57
58
59
60
61
62
        if not os.path.exists( tmp_folder ):
            try : os.mkdir(tmp_folder)
            except : pass
        return tempfile.NamedTemporaryFile(dir=tmp_folder)


Jerome Mariette's avatar
Jerome Mariette committed
63
64
65
66
67
def noBodyProcess():
    """Sets cherrypy.request.process_request_body = False, giving
    us direct control of the file upload destination. By default
    cherrypy loads it to memory, we are directing it to disk."""
    cherrypy.request.process_request_body = False
68
69

cherrypy.tools.noBodyProcess = cherrypy.Tool('before_request_body', noBodyProcess)
70
71
72
73
74
75
76

# define functions in charge to handle cross domain calls
def CORS():
    cherrypy.response.headers['Access-Control-Allow-Origin'] = '*'
    cherrypy.response.headers['Access-Control-Allow-Methods'] = 'OPTIONS, GET, POST'
    cherrypy.response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Content-Range, Content-Disposition'
cherrypy.tools.CORS = cherrypy.Tool('before_finalize', CORS)
Jerome Mariette's avatar
Jerome Mariette committed
77
78


Jerome Mariette's avatar
Jerome Mariette committed
79
80
class JFlowServer (object):

81
    MULTIPLE_TYPE_SPLITER = "._."
Philippe Bardou's avatar
Philippe Bardou committed
82
    JFLOW_WDATA = "data"
83

Jerome Mariette's avatar
Jerome Mariette committed
84
85
86
    def __init__(self):
        # Create a workflow manager to get access to our workflows
        self.wfmanager = WorkflowsManager()
Jerome Mariette's avatar
Jerome Mariette committed
87
        self.jflow_config_reader = JFlowConfigReader()
Jerome Mariette's avatar
Jerome Mariette committed
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140

    def jsonify(func):
        '''JSON and JSONP decorator for CherryPy'''
        @wraps(func)
        def wrapper(*args, **kw):
            value = func(*args, **kw)
            cherrypy.response.headers["Content-Type"] = "application/json"
            # if JSONP request
            if kw.has_key("callback"): return kw["callback"] + "(" + json.dumps(value) + ")"
            # else return the JSON
            else: return json.dumps(value)
        return wrapper

    def jsonify_workflow_status(self, workflow, init_to_zero=False):
        if workflow.start_time: start_time = time.asctime(time.localtime(workflow.start_time))
        else: start_time = "-"
        if workflow.start_time and workflow.end_time: elapsed_time = str(workflow.end_time-workflow.start_time)
        elif workflow.start_time: elapsed_time = str(time.time()-workflow.start_time)
        else: elapsed_time = "-"
        if workflow.end_time: end_time = time.asctime(time.localtime(workflow.end_time))
        else: end_time = "-"
        if init_to_zero:
            return {"id":utils.get_nb_string(workflow.id),
                    "name": workflow.name,
                    "status": Workflow.STATUS_STARTED,
                    "elapsed_time": str(elapsed_time),
                    "start_time": start_time,
                    "end_time": end_time,
                    "components": []}
        else :
            components = []
            for i, component in enumerate(workflow.get_components_nameid()):
                status_info = workflow.get_component_status(component)
                try: perc_waiting = (status_info["waiting"]*100.0)/status_info["tasks"]
                except: perc_waiting = 0
                try: perc_running = (status_info["running"]*100.0)/status_info["tasks"]
                except: perc_running = 0
                try: perc_failed = (status_info["failed"]*100.0)/status_info["tasks"]
                except: perc_failed = 0
                try: perc_aborted = (status_info["aborted"]*100.0)/status_info["tasks"]
                except: perc_aborted = 0
                try: perc_completed = (status_info["completed"]*100.0)/status_info["tasks"]
                except: perc_completed = 0
                components.append({"name": component,
                                  "elapsed_time": time_format(status_info["time"]),
                                  "total": status_info["tasks"],
                                  "waiting": status_info["waiting"],
                                  "failed": status_info["failed"],
                                  "running": status_info["running"],
                                  "aborted": status_info["aborted"],
                                  "completed": status_info["completed"]})
            status = {"id":utils.get_nb_string(workflow.id),
                      "name": workflow.name,
Jerome Mariette's avatar
Jerome Mariette committed
141
                      "metadata": workflow.metadata,
Jerome Mariette's avatar
Jerome Mariette committed
142
143
144
145
146
                      "status": workflow.get_status(),
                      "elapsed_time": str(elapsed_time),
                      "start_time": start_time,
                      "end_time": end_time,
                      "components": components}
147
         
Jerome Mariette's avatar
Jerome Mariette committed
148
            return status
149

Jerome Mariette's avatar
Jerome Mariette committed
150
151
152
153
154
155
    @cherrypy.expose
    @jsonify
    def get_available_workflows(self, **kwargs):
        workflows = []
        wf_instances = self.wfmanager.get_available_workflows()
        for instance in wf_instances:
Jerome Mariette's avatar
Jerome Mariette committed
156
            parameters, parameters_per_groups, groups = [], {}, ["default"]
Jerome Mariette's avatar
Jerome Mariette committed
157
            for param in instance.parameters:
158
159
160
161
162
163
164
                # if it's a multiple action change the action by the name
                if param.action == jflow.parameter.MiltipleAction:
                    action = "MiltipleAction"
                elif param.action == jflow.parameter.MiltipleAppendAction:
                    action = "MiltipleAppendAction"
                else:
                    action = param.action
Jerome Mariette's avatar
Jerome Mariette committed
165
166
167
168
                hash_param = {"help": param.help,
                              "required": param.required,
                              "default": param.default,
                              "choices": param.choices,
169
                              "action": action,
Jerome Mariette's avatar
Jerome Mariette committed
170
171
                              "type": param.get_type(),
                              "name": param.name,
Jerome Mariette's avatar
Jerome Mariette committed
172
173
                              "display_name": param.display_name,
                              "group": param.group}
174
175
176
177
178
179
180
181
182
183
184
                # if it's a multiple type add sub parameters
                if type(param.type) == jflow.parameter.MultipleParameters:
                    hash_param["sub_parameters"] = [] 
                    for sub_param in param.sub_parameters:
                        hash_param["sub_parameters"].append({"help": sub_param.help,
                              "required": sub_param.required,
                              "default": sub_param.default,
                              "choices": sub_param.choices,
                              "action": sub_param.action,
                              "type": sub_param.get_type(),
                              "name": param.name + JFlowServer.MULTIPLE_TYPE_SPLITER + sub_param.flag,
185
186
                              "display_name": sub_param.display_name,
                              "group": param.group})
Jerome Mariette's avatar
Jerome Mariette committed
187
188
189
190
191
                parameters.append(hash_param)
                if parameters_per_groups.has_key(param.group):
                    parameters_per_groups[param.group].append(hash_param)
                else: parameters_per_groups[param.group] = [hash_param]
                groups.append(param.group)
Jerome Mariette's avatar
Jerome Mariette committed
192
193
194
            workflows.append({"name": instance.name,
                              "help": instance.description,
                              "class": instance.__class__.__name__,
Jerome Mariette's avatar
Jerome Mariette committed
195
196
197
                              "parameters": parameters,
                              "parameters_per_groups": parameters_per_groups,
                              "groups": list(set(groups))})
Jerome Mariette's avatar
Jerome Mariette committed
198
199
200
201
202
        return workflows

    @cherrypy.expose
    @jsonify
    def run_workflow(self, **kwargs):
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
	try:
		kwargs_modified = {}
		for key in kwargs.keys():
		    parts = key.split(JFlowServer.MULTIPLE_TYPE_SPLITER)
		    if len(parts) == 1:
		        kwargs_modified[key] = kwargs[key]
		    elif len(parts) == 2:
		        if kwargs_modified.has_key(parts[0]):
		            kwargs_modified[parts[0]].append((parts[1], kwargs[key]))
		        else:
		            kwargs_modified[parts[0]] = [(parts[1], kwargs[key])]
		workflow = self.wfmanager.run_workflow(kwargs_modified["workflow_class"], kwargs_modified)
		return { "status" : 0, "content" : self.jsonify_workflow_status(workflow, True) }
	except Exception as err:
		return { "status" : 1, "content" : str(err) }
Jerome Mariette's avatar
Jerome Mariette committed
218
219
220
221
222
223
224

    @cherrypy.expose
    @jsonify
    def rerun_workflow(self, **kwargs):
        self.wfmanager.rerun_workflow(kwargs["workflow_id"])
        workflow = self.wfmanager.get_workflow(kwargs["workflow_id"])
        return self.jsonify_workflow_status(workflow)
225

Frédéric Escudié's avatar
Frédéric Escudié committed
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
    @cherrypy.expose
    def uploadLight(self, **kwargs):
        prefix = ""
        for key in kwargs.keys():
            if key == "prefix":
                prefix = kwargs[key]
            else:
                file_param = key
        
        # the file transfer can take a long time; by default cherrypy
        # limits responses to 300s; we increase it to 1h
        cherrypy.response.timeout = 3600
        
        # upload file by chunks
        FH_sever_file = open(os.path.join(self.jflow_config_reader.get_tmp_directory(), prefix+"_"+kwargs[file_param].filename.encode('ascii','ignore')), "w")
        while True:
            data = kwargs[file_param].file.read(8192)
            if not data:
                break
            FH_sever_file.write(data)
        FH_sever_file.close()

248
249
250
    @cherrypy.expose
    @cherrypy.tools.noBodyProcess()
    @cherrypy.tools.CORS()
251
    def upload(self):
252
253
254
        # the file transfer can take a long time; by default cherrypy
        # limits responses to 300s; we increase it to 1h
        cherrypy.response.timeout = 3600
255

256
257
258
259
        # convert the header keys to lower case
        lcHDRS = {}
        for key, val in cherrypy.request.headers.iteritems():
            lcHDRS[key.lower()] = val
260

261
262
        # at this point we could limit the upload on content-length...
        # incomingBytes = int(lcHDRS['content-length'])
263

264
265
        # create our version of cgi.FieldStorage to parse the MIME encoded
        # form data where the file is contained
266
267
268
269
270
271
        formFields = UploadFieldStorage(fp=cherrypy.request.rfile,
                                        headers=lcHDRS,
                                        environ={'REQUEST_METHOD':'POST'},
                                        keep_blank_values=True)

        # we now create a link to the file, using the submitted
272
273
274
        # filename; if we renamed, there would be a failure because
        # the NamedTemporaryFile, used by our version of cgi.FieldStorage,
        # explicitly deletes the original filename
275
276
277
278
279
280
281
282
        for current in formFields.keys():
            if current != 'prefix':
                currentFile = formFields[current]
                os.link(
                        currentFile.file.name,
                        os.path.join(formFields.get_tmp_directory(), formFields.getvalue("prefix") + "_" + currentFile.filename.encode('ascii','ignore'))
                )

Jerome Mariette's avatar
Jerome Mariette committed
283
284
285
286
287
288
289
290
291
292
    @cherrypy.expose
    @jsonify
    def get_workflows_status(self, **kwargs):
        if kwargs.has_key("workflow_id"):
            workflow = self.wfmanager.get_workflow(kwargs["workflow_id"])
            return self.jsonify_workflow_status(workflow)
        else:
            status = []
            workflows = self.wfmanager.get_workflows()
            for workflow in workflows:
Jerome Mariette's avatar
Jerome Mariette committed
293
294
295
296
297
298
299
300
301
302
303
                if kwargs.has_key("metadata_filter"):
                    is_ok = False
                    for wf_meta in workflow.metadata:
                        for metadata in kwargs["metadata_filter"].split(","):
                            if wf_meta == metadata:
                                is_ok = True
                                break
                        if is_ok: break
                    if is_ok: status.append(self.jsonify_workflow_status(workflow))
                else:
                    status.append(self.jsonify_workflow_status(workflow))
Jerome Mariette's avatar
Jerome Mariette committed
304
305
            return status

306
307
308
309
310
311
312
    @cherrypy.expose
    @jsonify
    def get_workflow_errors(self, **kwargs):
	print JFlowServer.JFLOW_WDATA
	print self.wfmanager.get_workflow_errors(kwargs["workflow_id"])
        return self.wfmanager.get_workflow_errors(kwargs["workflow_id"])

Philippe Bardou's avatar
Philippe Bardou committed
313
314
315
    @cherrypy.expose
    @jsonify
    def get_workflow_outputs(self, **kwargs):
Philippe Bardou's avatar
Philippe Bardou committed
316
        return self.wfmanager.get_workflow_ouputs(JFlowServer.JFLOW_WDATA, kwargs["workflow_id"])
317

Jerome Mariette's avatar
Jerome Mariette committed
318
319
320
321
322
323
324
325
326
327
328
329
330
331
    @cherrypy.expose
    @jsonify
    def validate_field(self, **kwargs):
        try:
            value_key = None
            for key in kwargs.keys():
                if key != "type" and key != "callback" and key != "_": 
                    value_key = key
                    break
            eval(kwargs["type"])(kwargs[value_key])
            return True
        except Exception, e:
            return str(e)

Jerome Mariette's avatar
Jerome Mariette committed
332
if __name__ == '__main__':
333

334
335
336
    parser = argparse.ArgumentParser()
    parser.add_argument("--daemon", action="store_true", dest="daemon", default=False, help="Run the server as daemon")
    args = vars(parser.parse_args())
337

338
339
340
341
    # daemonize the server if asked to
    if args["daemon"]:
        from cherrypy.process.plugins import Daemonizer
        Daemonizer(cherrypy.engine).subscribe()
342
343
344
    # define the socket host and port
    jflowconf = JFlowConfigReader()
    socket_opts = jflowconf.get_socket_options()
Jerome Mariette's avatar
Jerome Mariette committed
345
346
347
348

    # remove any limit on the request body size; cherrypy's default is 100MB
    # (maybe we should just increase it ?)
    cherrypy.server.max_request_body_size = 0
349

Jerome Mariette's avatar
Jerome Mariette committed
350
351
352
    # increase server socket timeout to 60s; we are more tolerant of bad
    # quality client-server connections (cherrypy's defult is 10s)
    cherrypy.server.socket_timeout = 60
353

Philippe Bardou's avatar
Philippe Bardou committed
354
355
356
    app_conf = {'/': {'tools.staticdir.root': jflowconf.get_work_directory()},
                os.path.join('/', JFlowServer.JFLOW_WDATA): {'tools.staticdir.on'  : True,
                                                             'tools.staticdir.dir' : jflowconf.get_work_directory()}}
357

358
359
    cherrypy.config.update({'server.socket_host': socket_opts[0],
                            'server.socket_port': socket_opts[1]})
Jerome Mariette's avatar
Jerome Mariette committed
360
    # start the server
361
    cherrypy.quickstart(JFlowServer(), config=app_conf)