Commit 329771ca authored by Jerome Mariette's avatar Jerome Mariette
Browse files

No commit message

No commit message
parent 734b64ec
...@@ -21,7 +21,6 @@ import re ...@@ -21,7 +21,6 @@ import re
import sys import sys
import uuid import uuid
import pkgutil import pkgutil
import urllib2
import tempfile import tempfile
import pickle import pickle
import time import time
...@@ -30,7 +29,6 @@ from ConfigParser import ConfigParser, NoOptionError ...@@ -30,7 +29,6 @@ from ConfigParser import ConfigParser, NoOptionError
import types import types
import logging import logging
from inspect import getcallargs from inspect import getcallargs
from urlparse import urlparse
import jflow import jflow
import jflow.utils as utils import jflow.utils as utils
...@@ -224,43 +222,8 @@ class Workflow(threading.Thread): ...@@ -224,43 +222,8 @@ class Workflow(threading.Thread):
def get_components_nameid(self): def get_components_nameid(self):
return self.component_nameids.keys() return self.component_nameids.keys()
def _download_url(self, url): def wf_execution_wrapper(self):
print "Downloading " + url + " ..." getattr(self, self.function)()
uri_object = urlparse(url)
opener = urllib2.urlopen(url)
block_size = 8000
jflowconf = JFlowConfigReader()
tmp_directory = os.path.join(jflowconf.get_tmp_directory(), os.path.basename(tempfile.NamedTemporaryFile().name))
os.mkdir(tmp_directory)
downloaded_file_path = os.path.join(tmp_directory, os.path.basename(uri_object.path))
downloaded_file = open(downloaded_file_path, 'wb')
metadata = opener.info()
while True:
buffer = opener.read(block_size)
# End of download
if not buffer: break
# Parts of download
downloaded_file.write(buffer)
downloaded_file.close()
logging.getLogger("types.ifile").debug("URL file '{0}' successfully downloaded as: {1}".format(url, downloaded_file_path))
return downloaded_file_path
def _is_url(self, file):
uri_object = urlparse(file)
return uri_object.scheme != ''
def _download_input_files(self):
for param in self.parameters:
if param.type.__name__ == "inputfile":
if param.action == "append":
for i, file in enumerate(self.args[param.name]):
if self._is_url(file):
# download file
self.args[param.name][i] = self._download_url(file)
else:
if self._is_url(file):
# download file
self.args[param.name] = self._download_url(file)
def run(self): def run(self):
""" """
...@@ -272,7 +235,6 @@ class Workflow(threading.Thread): ...@@ -272,7 +235,6 @@ class Workflow(threading.Thread):
self.step = 0 self.step = 0
self.status = self.STATUS_STARTED self.status = self.STATUS_STARTED
self.end_time = None self.end_time = None
self._download_input_files()
self._serialize() self._serialize()
# if pre_processing has not been done yet # if pre_processing has not been done yet
if self.step == 0: if self.step == 0:
...@@ -282,7 +244,7 @@ class Workflow(threading.Thread): ...@@ -282,7 +244,7 @@ class Workflow(threading.Thread):
# if collecting components and running workflow has not been done yet # if collecting components and running workflow has not been done yet
if self.step == 1: if self.step == 1:
try: try:
getattr(self, self.function)() self.wf_execution_wrapper()
except SystemExit, e: except SystemExit, e:
self.status = self.STATUS_FAILED self.status = self.STATUS_FAILED
self.end_time = time.time() self.end_time = time.time()
...@@ -298,7 +260,7 @@ class Workflow(threading.Thread): ...@@ -298,7 +260,7 @@ class Workflow(threading.Thread):
# if the workflow was a dynamic one # if the workflow was a dynamic one
if self.step == 2: if self.step == 2:
try: try:
getattr(self, self.function)() self.wf_execution_wrapper()
except SystemExit, e: except SystemExit, e:
self.status = self.STATUS_FAILED self.status = self.STATUS_FAILED
self.end_time = time.time() self.end_time = time.time()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment