jflow_server.py 18.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#
# Copyright (C) 2012 INRA
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import cherrypy
Penom Nom's avatar
Penom Nom committed
19 20
import cgi
import tempfile
21 22
import json
import sys
Penom Nom's avatar
Penom Nom committed
23
import datetime
24 25
from functools import wraps
import time
Penom Nom's avatar
Penom Nom committed
26
import datetime
27
import os
28
import argparse
Penom Nom's avatar
Penom Nom committed
29 30
import logging

Jerome Mariette's avatar
Jerome Mariette committed
31
from argparse import ArgumentTypeError
32 33 34 35 36 37

try:
    import _preamble
except ImportError:
    sys.exc_clear()

38
import jflow
39
from jflow.workflows_manager import WorkflowsManager
40
from jflow.config_reader import JFlowConfigReader
Jerome Mariette's avatar
Jerome Mariette committed
41
from jflow.workflow import Workflow
Penom Nom's avatar
Penom Nom committed
42
from jflow.parameter import browsefile, localfile, urlfile, inputfile, create_test_function
Jerome Mariette's avatar
Jerome Mariette committed
43
from workflows.types import *
44 45 46
import jflow.utils as utils
from cctools.util import time_format

Penom Nom's avatar
Penom Nom committed
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
# function in charge to upload large files
class UploadFieldStorage(cgi.FieldStorage):
    """Our version uses a named temporary file instead of the default
    non-named file; keeping it visibile (named), allows us to create a
    2nd link after the upload is done, thus avoiding the overhead of
    making a copy to the destination filename."""

    def get_tmp_directory(self):
        jflowconf = JFlowConfigReader()
        return jflowconf.get_tmp_directory()

    def make_file(self, binary=None):
        tmp_folder = self.get_tmp_directory()
        if not os.path.exists( tmp_folder ):
            try : os.mkdir(tmp_folder)
            except : pass
        return tempfile.NamedTemporaryFile(dir=tmp_folder)


def noBodyProcess():
    """Sets cherrypy.request.process_request_body = False, giving
    us direct control of the file upload destination. By default
    cherrypy loads it to memory, we are directing it to disk."""
    cherrypy.request.process_request_body = False

cherrypy.tools.noBodyProcess = cherrypy.Tool('before_request_body', noBodyProcess)

# define functions in charge to handle cross domain calls
def CORS():
    cherrypy.response.headers['Access-Control-Allow-Origin'] = '*'
    cherrypy.response.headers['Access-Control-Allow-Methods'] = 'OPTIONS, GET, POST'
    cherrypy.response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Content-Range, Content-Disposition'
cherrypy.tools.CORS = cherrypy.Tool('before_finalize', CORS)

class JFlowJSONEncoder (json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, (datetime.date, datetime.datetime)):
            return obj.strftime( JFlowConfigReader().get_date_format() )
        else:
            return json.JSONEncoder.default(self, obj)
87 88 89

class JFlowServer (object):

Penom Nom's avatar
Penom Nom committed
90 91
    MULTIPLE_TYPE_SPLITER = "___"
    JFLOW_WDATA = "data"
92

93 94 95
    def __init__(self):
        # Create a workflow manager to get access to our workflows
        self.wfmanager = WorkflowsManager()
Penom Nom's avatar
Penom Nom committed
96
        self.jflow_config_reader = JFlowConfigReader()
97 98

    def jsonify(func):
99
        '''JSON and JSONP decorator for CherryPy'''
100 101 102 103
        @wraps(func)
        def wrapper(*args, **kw):
            value = func(*args, **kw)
            cherrypy.response.headers["Content-Type"] = "application/json"
104
            # if JSONP request
Penom Nom's avatar
Penom Nom committed
105
            if kw.has_key("callback"): return kw["callback"] + "(" + json.dumps(value, cls=JFlowJSONEncoder) + ")"
106 107
            # else return the JSON
            else: return json.dumps(value)
108 109
        return wrapper

Jerome Mariette's avatar
Jerome Mariette committed
110
    def jsonify_workflow_status(self, workflow, init_to_zero=False):
111 112 113 114 115 116 117
        if workflow.start_time: start_time = time.asctime(time.localtime(workflow.start_time))
        else: start_time = "-"
        if workflow.start_time and workflow.end_time: elapsed_time = str(workflow.end_time-workflow.start_time)
        elif workflow.start_time: elapsed_time = str(time.time()-workflow.start_time)
        else: elapsed_time = "-"
        if workflow.end_time: end_time = time.asctime(time.localtime(workflow.end_time))
        else: end_time = "-"
Jerome Mariette's avatar
Jerome Mariette committed
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
        if init_to_zero:
            return {"id":utils.get_nb_string(workflow.id),
                    "name": workflow.name,
                    "status": Workflow.STATUS_STARTED,
                    "elapsed_time": str(elapsed_time),
                    "start_time": start_time,
                    "end_time": end_time,
                    "components": []}
        else :
            components = []
            for i, component in enumerate(workflow.get_components_nameid()):
                status_info = workflow.get_component_status(component)
                try: perc_waiting = (status_info["waiting"]*100.0)/status_info["tasks"]
                except: perc_waiting = 0
                try: perc_running = (status_info["running"]*100.0)/status_info["tasks"]
                except: perc_running = 0
                try: perc_failed = (status_info["failed"]*100.0)/status_info["tasks"]
                except: perc_failed = 0
                try: perc_aborted = (status_info["aborted"]*100.0)/status_info["tasks"]
                except: perc_aborted = 0
                try: perc_completed = (status_info["completed"]*100.0)/status_info["tasks"]
                except: perc_completed = 0
                components.append({"name": component,
                                  "elapsed_time": time_format(status_info["time"]),
                                  "total": status_info["tasks"],
                                  "waiting": status_info["waiting"],
                                  "failed": status_info["failed"],
                                  "running": status_info["running"],
                                  "aborted": status_info["aborted"],
                                  "completed": status_info["completed"]})
Penom Nom's avatar
Penom Nom committed
148

Jerome Mariette's avatar
Jerome Mariette committed
149
            status = {"id":utils.get_nb_string(workflow.id),
Penom Nom's avatar
Penom Nom committed
150
                      "errors": workflow.get_errors(),
Jerome Mariette's avatar
Jerome Mariette committed
151
                      "name": workflow.name,
Jerome Mariette's avatar
Jerome Mariette committed
152
                      "metadata": workflow.metadata,
Jerome Mariette's avatar
Jerome Mariette committed
153
                      "status": workflow.get_status(),
Penom Nom's avatar
Penom Nom committed
154
                      "elapsed_time": str(datetime.timedelta(seconds=int(str(elapsed_time).split(".")[0]))),
Jerome Mariette's avatar
Jerome Mariette committed
155 156 157
                      "start_time": start_time,
                      "end_time": end_time,
                      "components": components}
Penom Nom's avatar
Penom Nom committed
158

Jerome Mariette's avatar
Jerome Mariette committed
159
            return status
Penom Nom's avatar
Penom Nom committed
160

161 162 163 164
    @cherrypy.expose
    @jsonify
    def get_available_workflows(self, **kwargs):
        workflows = []
Penom Nom's avatar
Penom Nom committed
165
        wf_instances, wf_methodes = self.wfmanager.get_available_workflows()
166
        for instance in wf_instances:
Jerome Mariette's avatar
Jerome Mariette committed
167
            parameters, parameters_per_groups, groups = [], {}, ["default"]
Penom Nom's avatar
Penom Nom committed
168
            for param in instance.get_parameters():
169 170 171 172 173 174 175
                # if it's a multiple action change the action by the name
                if param.action == jflow.parameter.MiltipleAction:
                    action = "MiltipleAction"
                elif param.action == jflow.parameter.MiltipleAppendAction:
                    action = "MiltipleAppendAction"
                else:
                    action = param.action
Jerome Mariette's avatar
Jerome Mariette committed
176 177 178 179
                hash_param = {"help": param.help,
                              "required": param.required,
                              "default": param.default,
                              "choices": param.choices,
180
                              "action": action,
Jerome Mariette's avatar
Jerome Mariette committed
181 182
                              "type": param.get_type(),
                              "name": param.name,
Jerome Mariette's avatar
Jerome Mariette committed
183 184
                              "display_name": param.display_name,
                              "group": param.group}
Penom Nom's avatar
Penom Nom committed
185 186 187 188 189 190 191 192 193 194
                if hash_param["type"] == "date":
                    hash_param["format"] = self.jflow_config_reader.get_date_format()
                    if hash_param["format"] == '%d/%m/%Y':
                        hash_param["format"] = 'dd/mm/yyyy'
                    elif hash_param["format"] == '%d/%m/%y':
                        hash_param["format"] = 'dd/mm/yy'
                    elif hash_param["format"] == '%Y/%m/%d':
                        hash_param["format"] = 'yyyy/mm/dd'
                    elif hash_param["format"] == '%y/%m/%d':
                        hash_param["format"] = 'yy/mm/dd'
195 196
                # if it's a multiple type add sub parameters
                if type(param.type) == jflow.parameter.MultipleParameters:
Penom Nom's avatar
Penom Nom committed
197
                    hash_param["sub_parameters"] = []
198 199 200 201 202 203 204 205
                    for sub_param in param.sub_parameters:
                        hash_param["sub_parameters"].append({"help": sub_param.help,
                              "required": sub_param.required,
                              "default": sub_param.default,
                              "choices": sub_param.choices,
                              "action": sub_param.action,
                              "type": sub_param.get_type(),
                              "name": param.name + JFlowServer.MULTIPLE_TYPE_SPLITER + sub_param.flag,
206 207
                              "display_name": sub_param.display_name,
                              "group": param.group})
Penom Nom's avatar
Penom Nom committed
208 209 210 211 212 213 214 215 216 217
                        if hash_param["type"] == "date":
                            hash_param["format"] = self.jflow_config_reader.get_date_format()
                            if hash_param["format"] == '%d/%m/%Y':
                                hash_param["format"] = 'dd/mm/yyyy'
                            elif hash_param["format"] == '%d/%m/%y':
                                hash_param["format"] = 'dd/mm/yy'
                            elif hash_param["format"] == '%Y/%m/%d':
                                hash_param["format"] = 'yyyy/mm/dd'
                            elif hash_param["format"] == '%y/%m/%d':
                                hash_param["format"] = 'yy/mm/dd'
Jerome Mariette's avatar
Jerome Mariette committed
218 219 220 221 222
                parameters.append(hash_param)
                if parameters_per_groups.has_key(param.group):
                    parameters_per_groups[param.group].append(hash_param)
                else: parameters_per_groups[param.group] = [hash_param]
                groups.append(param.group)
223 224
            workflows.append({"name": instance.name,
                              "help": instance.description,
Jerome Mariette's avatar
Jerome Mariette committed
225
                              "class": instance.__class__.__name__,
Jerome Mariette's avatar
Jerome Mariette committed
226 227 228
                              "parameters": parameters,
                              "parameters_per_groups": parameters_per_groups,
                              "groups": list(set(groups))})
229 230
        return workflows

Jerome Mariette's avatar
Jerome Mariette committed
231 232 233
    @cherrypy.expose
    @jsonify
    def run_workflow(self, **kwargs):
Penom Nom's avatar
Penom Nom committed
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
        try:
            kwargs_modified = {}
            for key in kwargs.keys():
                parts = key.split(JFlowServer.MULTIPLE_TYPE_SPLITER)
                # if this is a classic Parameter
                if len(parts) == 1:
                    kwargs_modified[key] = kwargs[key]
                # if this is a MultiParameter
                elif len(parts) == 2:
                    if kwargs_modified.has_key(parts[0]):
                        kwargs_modified[parts[0]].append((parts[1], kwargs[key]))
                    else:
                        kwargs_modified[parts[0]] = [(parts[1], kwargs[key])]
                # if this is a MultiParameterList
                # TODO: du cote interface faire qq chose du genre: key.sub_key.1 ... donc si len == 3 
                # l'objectif etant d'avoir une structure de type: [[(sub_key1: val), (sub_key2: val)], [(sub_key1: val2), (sub_key2: val2)]]
            workflow = self.wfmanager.run_workflow(kwargs_modified["workflow_class"], kwargs_modified)
            return { "status" : 0, "content" : self.jsonify_workflow_status(workflow, True) }
        except Exception as err:
            return { "status" : 1, "content" : str(err) }
Jerome Mariette's avatar
Jerome Mariette committed
254

255 256 257
    @cherrypy.expose
    @jsonify
    def rerun_workflow(self, **kwargs):
Penom Nom's avatar
Penom Nom committed
258
        workflow = self.wfmanager.rerun_workflow(kwargs["workflow_id"])
259
        return self.jsonify_workflow_status(workflow)
Penom Nom's avatar
Penom Nom committed
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327

    @cherrypy.expose
    @jsonify
    def reset_workflow_component(self, **kwargs):
        workflow = self.wfmanager.reset_workflow_component(kwargs["workflow_id"], kwargs["component_name"])
        return self.jsonify_workflow_status(workflow)

    @cherrypy.expose
    def upload_light(self, **kwargs):
        uniq_directory = ""
        for key in kwargs.keys():
            if key == "uniq_directory":
                uniq_directory = kwargs['uniq_directory']
            else:
                file_param = key

        # the file transfer can take a long time; by default cherrypy
        # limits responses to 300s; we increase it to 1h
        cherrypy.response.timeout = 3600

        # upload file by chunks
        file_dir = os.path.join( self.jflow_config_reader.get_tmp_directory(), uniq_directory )
        os.mkdir( file_dir )
        FH_sever_file = open(os.path.join(file_dir, kwargs[file_param].filename.encode('ascii','ignore')), "w")
        while True:
            data = kwargs[file_param].file.read(8192)
            if not data:
                break
            FH_sever_file.write(data)
        FH_sever_file.close()

    @cherrypy.expose
    @cherrypy.tools.noBodyProcess()
    @cherrypy.tools.CORS()
    def upload(self):
        # the file transfer can take a long time; by default cherrypy
        # limits responses to 300s; we increase it to 1h
        cherrypy.response.timeout = 3600

        # convert the header keys to lower case
        lcHDRS = {}
        for key, val in cherrypy.request.headers.iteritems():
            lcHDRS[key.lower()] = val

        # at this point we could limit the upload on content-length...
        # incomingBytes = int(lcHDRS['content-length'])

        # create our version of cgi.FieldStorage to parse the MIME encoded
        # form data where the file is contained
        formFields = UploadFieldStorage(fp=cherrypy.request.rfile,
                                        headers=lcHDRS,
                                        environ={'REQUEST_METHOD':'POST'},
                                        keep_blank_values=True)

        # we now create a link to the file, using the submitted
        # filename; if we renamed, there would be a failure because
        # the NamedTemporaryFile, used by our version of cgi.FieldStorage,
        # explicitly deletes the original filename
        for current in formFields.keys():
            if current != 'uniq_directory':
                currentFile = formFields[current]
                fileDir = os.path.join(self.jflow_config_reader.get_tmp_directory(), formFields.getvalue("uniq_directory"))
                os.mkdir(fileDir)
                os.link(
                        currentFile.file.name,
                        os.path.join(fileDir, currentFile.filename.encode('ascii','ignore'))
                )

328 329 330 331 332
    @cherrypy.expose
    @jsonify
    def get_workflows_status(self, **kwargs):
        if kwargs.has_key("workflow_id"):
            workflow = self.wfmanager.get_workflow(kwargs["workflow_id"])
Penom Nom's avatar
Penom Nom committed
333 334 335 336 337 338 339
            if kwargs["display"] == "list":
                return self.jsonify_workflow_status(workflow)
            elif kwargs["display"] == "graph":
                g = workflow.get_execution_graph()
                status = self.jsonify_workflow_status(workflow)
                status["edges"] = g.edges()
                return status
340 341 342 343
        else:
            status = []
            workflows = self.wfmanager.get_workflows()
            for workflow in workflows:
Jerome Mariette's avatar
Jerome Mariette committed
344 345 346 347 348 349 350 351 352 353 354
                if kwargs.has_key("metadata_filter"):
                    is_ok = False
                    for wf_meta in workflow.metadata:
                        for metadata in kwargs["metadata_filter"].split(","):
                            if wf_meta == metadata:
                                is_ok = True
                                break
                        if is_ok: break
                    if is_ok: status.append(self.jsonify_workflow_status(workflow))
                else:
                    status.append(self.jsonify_workflow_status(workflow))
355 356
            return status

Penom Nom's avatar
Penom Nom committed
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372
    def _webify_outputs(self, web_path, path):
        work_dir  = self.jflow_config_reader.get_work_directory()
        if work_dir.endswith("/"): work_dir = work_dir[:-1]
        socket_opt = self.jflow_config_reader.get_socket_options()
        return "http://" + socket_opt[0] + ":" + str(socket_opt[1]) + "/" + path.replace(work_dir, web_path)

    @cherrypy.expose
    @jsonify
    def get_workflow_outputs(self, **kwargs):
        on_disk_outputs, on_web_outputs = self.wfmanager.get_workflow_ouputs(kwargs["workflow_id"]), {}
        for cpt_name in on_disk_outputs.keys():
            on_web_outputs[cpt_name] = {}
            for outf in on_disk_outputs[cpt_name]:
                on_web_outputs[cpt_name][outf] = self._webify_outputs(JFlowServer.JFLOW_WDATA, on_disk_outputs[cpt_name][outf])
        return on_web_outputs

Jerome Mariette's avatar
Jerome Mariette committed
373 374 375 376 377 378 379 380 381
    @cherrypy.expose
    @jsonify
    def validate_field(self, **kwargs):
        try:
            value_key = None
            for key in kwargs.keys():
                if key != "type" and key != "callback" and key != "_": 
                    value_key = key
                    break
Penom Nom's avatar
Penom Nom committed
382
            create_test_function(kwargs["type"])(kwargs[value_key])
Jerome Mariette's avatar
Jerome Mariette committed
383 384 385 386
            return True
        except Exception, e:
            return str(e)

387
if __name__ == '__main__':
Penom Nom's avatar
Penom Nom committed
388

389 390 391
    parser = argparse.ArgumentParser()
    parser.add_argument("--daemon", action="store_true", dest="daemon", default=False, help="Run the server as daemon")
    args = vars(parser.parse_args())
Penom Nom's avatar
Penom Nom committed
392

393 394 395 396
    # daemonize the server if asked to
    if args["daemon"]:
        from cherrypy.process.plugins import Daemonizer
        Daemonizer(cherrypy.engine).subscribe()
397 398 399
    # define the socket host and port
    jflowconf = JFlowConfigReader()
    socket_opts = jflowconf.get_socket_options()
Penom Nom's avatar
Penom Nom committed
400 401 402 403 404 405 406 407 408 409 410 411 412

    # remove any limit on the request body size; cherrypy's default is 100MB
    # (maybe we should just increase it ?)
    cherrypy.server.max_request_body_size = 0

    # increase server socket timeout to 60s; we are more tolerant of bad
    # quality client-server connections (cherrypy's defult is 10s)
    cherrypy.server.socket_timeout = 60

    app_conf = {'/': {'tools.staticdir.root': jflowconf.get_work_directory()},
                os.path.join('/', JFlowServer.JFLOW_WDATA): {'tools.staticdir.on'  : True,
                                                             'tools.staticdir.dir' : jflowconf.get_work_directory()}}

413 414
    cherrypy.config.update({'server.socket_host': socket_opts[0],
                            'server.socket_port': socket_opts[1]})
415
    # start the server
Penom Nom's avatar
Penom Nom committed
416
    cherrypy.quickstart(JFlowServer(), config=app_conf)