Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
genotoul-bioinfo
jflow
Commits
9474bb02
Commit
9474bb02
authored
Sep 23, 2015
by
Jerome Mariette
Browse files
from python2 to python3 > ok from command line
parent
3d47cbc6
Changes
1
Hide whitespace changes
Inline
Side-by-side
src/jflow/workflow.py
View file @
9474bb02
...
@@ -30,7 +30,7 @@ import datetime
...
@@ -30,7 +30,7 @@ import datetime
import
logging
import
logging
import
traceback
import
traceback
from
C
onfig
P
arser
import
ConfigParser
,
NoOptionError
from
c
onfig
p
arser
import
ConfigParser
,
NoOptionError
from
inspect
import
getcallargs
from
inspect
import
getcallargs
from
datetime
import
date
as
ddate
from
datetime
import
date
as
ddate
...
@@ -150,15 +150,15 @@ class Workflow(threading.Thread):
...
@@ -150,15 +150,15 @@ class Workflow(threading.Thread):
if
self
.
id
is
not
None
:
if
self
.
id
is
not
None
:
self
.
directory
=
self
.
manager
.
get_workflow_directory
(
self
.
name
,
self
.
id
)
self
.
directory
=
self
.
manager
.
get_workflow_directory
(
self
.
name
,
self
.
id
)
if
not
os
.
path
.
isdir
(
self
.
directory
):
if
not
os
.
path
.
isdir
(
self
.
directory
):
os
.
makedirs
(
self
.
directory
,
0751
)
os
.
makedirs
(
self
.
directory
,
0
o
751
)
if
self
.
stderr
is
None
:
if
self
.
stderr
is
None
:
self
.
stderr
=
self
.
_set_stderr
()
self
.
stderr
=
self
.
_set_stderr
()
self
.
_serialize
()
self
.
_serialize
()
self
.
internal_components
=
self
.
_import_internal_components
()
self
.
internal_components
=
self
.
_import_internal_components
()
self
.
external_components
=
self
.
_import_external_components
()
self
.
external_components
=
self
.
_import_external_components
()
def
add_input_directory
(
self
,
name
,
help
,
default
=
None
,
required
=
False
,
flag
=
None
,
def
add_input_directory
(
self
,
name
,
help
,
default
=
None
,
required
=
False
,
flag
=
None
,
group
=
"default"
,
display_name
=
None
,
get_files_fn
=
None
,
add_to
=
None
):
group
=
"default"
,
display_name
=
None
,
get_files_fn
=
None
,
add_to
=
None
):
new_param
=
InputDirectory
(
name
,
help
,
flag
=
flag
,
default
=
default
,
required
=
required
,
new_param
=
InputDirectory
(
name
,
help
,
flag
=
flag
,
default
=
default
,
required
=
required
,
...
@@ -227,7 +227,7 @@ class Workflow(threading.Thread):
...
@@ -227,7 +227,7 @@ class Workflow(threading.Thread):
new_param
=
MultiParameterList
(
name
,
help
,
flag
=
flag
,
required
=
required
,
group
=
group
,
display_name
=
display_name
)
new_param
=
MultiParameterList
(
name
,
help
,
flag
=
flag
,
required
=
required
,
group
=
group
,
display_name
=
display_name
)
self
.
__setattr__
(
name
,
new_param
)
self
.
__setattr__
(
name
,
new_param
)
def
add_parameter
(
self
,
name
,
help
,
default
=
None
,
type
=
types
.
StringType
,
choices
=
None
,
def
add_parameter
(
self
,
name
,
help
,
default
=
None
,
type
=
str
,
choices
=
None
,
required
=
False
,
flag
=
None
,
group
=
"default"
,
display_name
=
None
,
add_to
=
None
):
required
=
False
,
flag
=
None
,
group
=
"default"
,
display_name
=
None
,
add_to
=
None
):
new_param
=
ParameterFactory
.
factory
(
name
,
help
,
flag
=
flag
,
default
=
default
,
type
=
type
,
choices
=
choices
,
new_param
=
ParameterFactory
.
factory
(
name
,
help
,
flag
=
flag
,
default
=
default
,
type
=
type
,
choices
=
choices
,
required
=
required
,
group
=
group
,
display_name
=
display_name
)
required
=
required
,
group
=
group
,
display_name
=
display_name
)
...
@@ -241,7 +241,7 @@ class Workflow(threading.Thread):
...
@@ -241,7 +241,7 @@ class Workflow(threading.Thread):
self
.
params_order
.
append
(
name
)
self
.
params_order
.
append
(
name
)
self
.
__setattr__
(
name
,
new_param
)
self
.
__setattr__
(
name
,
new_param
)
def
add_parameter_list
(
self
,
name
,
help
,
default
=
None
,
type
=
types
.
StringType
,
choices
=
None
,
def
add_parameter_list
(
self
,
name
,
help
,
default
=
None
,
type
=
str
,
choices
=
None
,
required
=
False
,
flag
=
None
,
group
=
"default"
,
display_name
=
None
,
add_to
=
None
):
required
=
False
,
flag
=
None
,
group
=
"default"
,
display_name
=
None
,
add_to
=
None
):
if
default
==
None
:
default
=
[]
if
default
==
None
:
default
=
[]
new_param
=
ParameterList
(
name
,
help
,
flag
=
flag
,
default
=
default
,
type
=
type
,
choices
=
choices
,
new_param
=
ParameterList
(
name
,
help
,
flag
=
flag
,
default
=
default
,
type
=
type
,
choices
=
choices
,
...
@@ -290,7 +290,7 @@ class Workflow(threading.Thread):
...
@@ -290,7 +290,7 @@ class Workflow(threading.Thread):
def
_prepare_parameter
(
self
,
args
,
parameter
,
key
=
"name"
):
def
_prepare_parameter
(
self
,
args
,
parameter
,
key
=
"name"
):
new_param
=
None
new_param
=
None
# Retrieve value
# Retrieve value
if
args
.
has_key
(
parameter
.
__getattribute__
(
key
)
)
:
if
parameter
.
__getattribute__
(
key
)
in
args
:
value
=
args
[
parameter
.
__getattribute__
(
key
)]
value
=
args
[
parameter
.
__getattribute__
(
key
)]
elif
parameter
.
default
!=
None
:
elif
parameter
.
default
!=
None
:
value
=
parameter
.
default
value
=
parameter
.
default
...
@@ -342,7 +342,7 @@ class Workflow(threading.Thread):
...
@@ -342,7 +342,7 @@ class Workflow(threading.Thread):
if
param
.
__class__
==
MultiParameter
:
if
param
.
__class__
==
MultiParameter
:
new_param
=
MultiParameter
(
param
.
name
,
param
.
help
,
required
=
param
.
required
,
flag
=
param
.
flag
,
group
=
param
.
group
,
display_name
=
param
.
display_name
)
new_param
=
MultiParameter
(
param
.
name
,
param
.
help
,
required
=
param
.
required
,
flag
=
param
.
flag
,
group
=
param
.
group
,
display_name
=
param
.
display_name
)
new_param
.
sub_parameters
=
param
.
sub_parameters
new_param
.
sub_parameters
=
param
.
sub_parameters
if
args
.
has_key
(
param
.
name
)
:
if
param
.
name
in
args
:
sub_args
=
{}
sub_args
=
{}
for
sarg
in
args
[
param
.
name
]:
for
sarg
in
args
[
param
.
name
]:
sub_args
[
sarg
[
0
]]
=
sarg
[
1
]
sub_args
[
sarg
[
0
]]
=
sarg
[
1
]
...
@@ -352,7 +352,7 @@ class Workflow(threading.Thread):
...
@@ -352,7 +352,7 @@ class Workflow(threading.Thread):
elif
param
.
__class__
==
MultiParameterList
:
elif
param
.
__class__
==
MultiParameterList
:
new_param
=
MultiParameterList
(
param
.
name
,
param
.
help
,
required
=
param
.
required
,
flag
=
param
.
flag
,
group
=
param
.
group
,
display_name
=
param
.
display_name
)
new_param
=
MultiParameterList
(
param
.
name
,
param
.
help
,
required
=
param
.
required
,
flag
=
param
.
flag
,
group
=
param
.
group
,
display_name
=
param
.
display_name
)
new_param
.
sub_parameters
=
param
.
sub_parameters
new_param
.
sub_parameters
=
param
.
sub_parameters
if
args
.
has_key
(
param
.
name
)
:
if
param
.
name
in
args
:
for
idx
,
sargs
in
enumerate
(
args
[
param
.
name
]):
for
idx
,
sargs
in
enumerate
(
args
[
param
.
name
]):
new_multi_param
=
MultiParameter
(
param
.
name
+
'_'
+
str
(
idx
),
''
,
required
=
False
,
flag
=
None
,
group
=
"default"
,
display_name
=
None
)
new_multi_param
=
MultiParameter
(
param
.
name
+
'_'
+
str
(
idx
),
''
,
required
=
False
,
flag
=
None
,
group
=
"default"
,
display_name
=
None
)
sub_args
=
{}
sub_args
=
{}
...
@@ -370,7 +370,7 @@ class Workflow(threading.Thread):
...
@@ -370,7 +370,7 @@ class Workflow(threading.Thread):
gr
=
digraph
()
gr
=
digraph
()
# build a all_nodes table to store all nodes
# build a all_nodes table to store all nodes
all_nodes
=
{}
all_nodes
=
{}
for
ioparameter
in
self
.
__dict__
.
values
():
for
ioparameter
in
list
(
self
.
__dict__
.
values
()
)
:
if
issubclass
(
ioparameter
.
__class__
,
InputFile
):
if
issubclass
(
ioparameter
.
__class__
,
InputFile
):
gr
.
add_node
(
ioparameter
.
name
)
gr
.
add_node
(
ioparameter
.
name
)
gr
.
add_node_attribute
(
ioparameter
.
name
,
self
.
INPUTFILE_GRAPH_LABEL
)
gr
.
add_node_attribute
(
ioparameter
.
name
,
self
.
INPUTFILE_GRAPH_LABEL
)
...
@@ -412,7 +412,7 @@ class Workflow(threading.Thread):
...
@@ -412,7 +412,7 @@ class Workflow(threading.Thread):
gr
.
add_node_attribute
(
cpt
.
get_nameid
(),
cpt
.
get_nameid
())
gr
.
add_node_attribute
(
cpt
.
get_nameid
(),
cpt
.
get_nameid
())
all_nodes
[
cpt
.
get_nameid
()]
=
None
all_nodes
[
cpt
.
get_nameid
()]
=
None
for
cpt
in
self
.
components
:
for
cpt
in
self
.
components
:
for
ioparameter
in
cpt
.
__dict__
.
values
():
for
ioparameter
in
list
(
cpt
.
__dict__
.
values
()
)
:
if
issubclass
(
ioparameter
.
__class__
,
InputFile
)
or
issubclass
(
ioparameter
.
__class__
,
InputFileList
)
or
issubclass
(
ioparameter
.
__class__
,
InputDirectory
):
if
issubclass
(
ioparameter
.
__class__
,
InputFile
)
or
issubclass
(
ioparameter
.
__class__
,
InputFileList
)
or
issubclass
(
ioparameter
.
__class__
,
InputDirectory
):
for
parent
in
ioparameter
.
parent_linkTrace_nameid
:
for
parent
in
ioparameter
.
parent_linkTrace_nameid
:
try
:
gr
.
add_edge
((
parent
,
ioparameter
.
linkTrace_nameid
))
try
:
gr
.
add_edge
((
parent
,
ioparameter
.
linkTrace_nameid
))
...
@@ -423,12 +423,12 @@ class Workflow(threading.Thread):
...
@@ -423,12 +423,12 @@ class Workflow(threading.Thread):
except
:
pass
except
:
pass
# check if all nodes are connected
# check if all nodes are connected
for
edge
in
gr
.
edges
():
for
edge
in
gr
.
edges
():
if
all_nodes
.
has_key
(
edge
[
0
])
:
if
edge
[
0
]
in
all_nodes
:
del
all_nodes
[
edge
[
0
]]
del
all_nodes
[
edge
[
0
]]
if
all_nodes
.
has_key
(
edge
[
1
])
:
if
edge
[
1
]
in
all_nodes
:
del
all_nodes
[
edge
[
1
]]
del
all_nodes
[
edge
[
1
]]
# then remove all unconnected nodes: to delete inputs not defined by the user
# then remove all unconnected nodes: to delete inputs not defined by the user
for
orphan_node
in
all_nodes
.
keys
():
for
orphan_node
in
list
(
all_nodes
.
keys
()
)
:
gr
.
del_node
(
orphan_node
)
gr
.
del_node
(
orphan_node
)
return
gr
return
gr
...
@@ -593,10 +593,10 @@ class Workflow(threading.Thread):
...
@@ -593,10 +593,10 @@ class Workflow(threading.Thread):
Threading uses Lock Object, do not consider these objects when serializing a workflow
Threading uses Lock Object, do not consider these objects when serializing a workflow
"""
"""
odict
=
self
.
__dict__
.
copy
()
odict
=
self
.
__dict__
.
copy
()
del
odict
[
'_
Thread__
started'
]
del
odict
[
'_started'
]
del
odict
[
'_
Thread__
block'
]
del
odict
[
'_block'
]
del
odict
[
'_
Thread__
stderr'
]
del
odict
[
'_stderr'
]
if
odict
.
has_key
(
'external_components'
)
:
if
'external_components'
in
odict
:
del
odict
[
'external_components'
]
del
odict
[
'external_components'
]
return
odict
return
odict
...
@@ -666,7 +666,7 @@ class Workflow(threading.Thread):
...
@@ -666,7 +666,7 @@ class Workflow(threading.Thread):
pgparameters
,
parameters_order
=
{},
[]
pgparameters
,
parameters_order
=
{},
[]
for
param
in
parameters
:
for
param
in
parameters
:
if
param
.
group
not
in
parameters_order
:
parameters_order
.
append
(
param
.
group
)
if
param
.
group
not
in
parameters_order
:
parameters_order
.
append
(
param
.
group
)
if
pg
param
eters
.
has_key
(
param
.
group
)
:
if
param
.
group
in
pgparameters
:
pgparameters
[
param
.
group
].
append
(
param
)
pgparameters
[
param
.
group
].
append
(
param
)
else
:
else
:
pgparameters
[
param
.
group
]
=
[
param
]
pgparameters
[
param
.
group
]
=
[
param
]
...
@@ -675,7 +675,7 @@ class Workflow(threading.Thread):
...
@@ -675,7 +675,7 @@ class Workflow(threading.Thread):
def
get_parameters
(
self
):
def
get_parameters
(
self
):
params
=
[]
params
=
[]
for
param
in
self
.
params_order
:
for
param
in
self
.
params_order
:
for
attribute_value
in
self
.
__dict__
.
values
():
for
attribute_value
in
list
(
self
.
__dict__
.
values
()
)
:
if
(
issubclass
(
attribute_value
.
__class__
,
AbstractParameter
))
and
param
==
attribute_value
.
name
:
if
(
issubclass
(
attribute_value
.
__class__
,
AbstractParameter
))
and
param
==
attribute_value
.
name
:
params
.
append
(
attribute_value
)
params
.
append
(
attribute_value
)
return
params
return
params
...
@@ -696,9 +696,9 @@ class Workflow(threading.Thread):
...
@@ -696,9 +696,9 @@ class Workflow(threading.Thread):
def
add_component
(
self
,
component_name
,
args
=
[],
kwargs
=
{},
component_prefix
=
"default"
):
def
add_component
(
self
,
component_name
,
args
=
[],
kwargs
=
{},
component_prefix
=
"default"
):
# first build and check if this component is OK
# first build and check if this component is OK
if
self
.
internal_components
.
has_key
(
component_name
)
or
self
.
external_components
.
has_key
(
component_name
)
:
if
component_name
in
self
.
internal_components
or
component_name
in
self
.
external_components
:
if
self
.
internal_components
.
has_key
(
component_name
)
:
if
component_name
in
self
.
internal_components
:
my_pckge
=
__import__
(
self
.
internal_components
[
component_name
],
globals
(),
locals
(),
[
component_name
],
-
1
)
my_pckge
=
__import__
(
self
.
internal_components
[
component_name
],
globals
(),
locals
(),
[
component_name
],
-
1
)
# build the object and define required field
# build the object and define required field
cmpt_object
=
getattr
(
my_pckge
,
component_name
)()
cmpt_object
=
getattr
(
my_pckge
,
component_name
)()
...
@@ -753,7 +753,7 @@ class Workflow(threading.Thread):
...
@@ -753,7 +753,7 @@ class Workflow(threading.Thread):
return
cmpt_object
return
cmpt_object
else
:
else
:
raise
ImportError
(
component_name
+
" component cannot be loaded, available components are: {0}"
.
format
(
raise
ImportError
(
component_name
+
" component cannot be loaded, available components are: {0}"
.
format
(
", "
.
join
(
self
.
internal_components
.
keys
()
+
self
.
external_components
.
keys
())))
", "
.
join
(
list
(
self
.
internal_components
.
keys
()
)
+
list
(
self
.
external_components
.
keys
())))
)
def
pre_process
(
self
):
def
pre_process
(
self
):
pass
pass
...
@@ -793,7 +793,7 @@ class Workflow(threading.Thread):
...
@@ -793,7 +793,7 @@ class Workflow(threading.Thread):
return
os
.
path
.
join
(
self
.
directory
,
component_name
+
"_"
+
component_prefix
)
return
os
.
path
.
join
(
self
.
directory
,
component_name
+
"_"
+
component_prefix
)
def
get_components_nameid
(
self
):
def
get_components_nameid
(
self
):
return
self
.
component_nameids
.
keys
()
return
list
(
self
.
component_nameids
.
keys
()
)
def
wf_execution_wrapper
(
self
):
def
wf_execution_wrapper
(
self
):
getattr
(
self
,
self
.
function
)()
getattr
(
self
,
self
.
function
)()
...
@@ -824,7 +824,7 @@ class Workflow(threading.Thread):
...
@@ -824,7 +824,7 @@ class Workflow(threading.Thread):
self
.
components
=
[]
self
.
components
=
[]
self
.
status
=
self
.
STATUS_STARTED
self
.
status
=
self
.
STATUS_STARTED
self
.
wf_execution_wrapper
()
self
.
wf_execution_wrapper
()
except
SystemExit
,
e
:
except
SystemExit
:
self
.
status
=
self
.
STATUS_FAILED
self
.
status
=
self
.
STATUS_FAILED
self
.
end_time
=
time
.
time
()
self
.
end_time
=
time
.
time
()
self
.
_serialize
()
self
.
_serialize
()
...
@@ -844,7 +844,7 @@ class Workflow(threading.Thread):
...
@@ -844,7 +844,7 @@ class Workflow(threading.Thread):
self
.
components
=
[]
self
.
components
=
[]
self
.
status
=
self
.
STATUS_STARTED
self
.
status
=
self
.
STATUS_STARTED
self
.
wf_execution_wrapper
()
self
.
wf_execution_wrapper
()
except
SystemExit
,
e
:
except
SystemExit
:
self
.
status
=
self
.
STATUS_FAILED
self
.
status
=
self
.
STATUS_FAILED
self
.
end_time
=
time
.
time
()
self
.
end_time
=
time
.
time
()
self
.
_serialize
()
self
.
_serialize
()
...
@@ -1049,7 +1049,7 @@ class Workflow(threading.Thread):
...
@@ -1049,7 +1049,7 @@ class Workflow(threading.Thread):
workflow_dump
.
close
()
workflow_dump
.
close
()
def
_component_is_duplicated
(
self
,
component
):
def
_component_is_duplicated
(
self
,
component
):
if
component
.
get_nameid
()
in
self
.
component_nameids
.
keys
():
if
component
.
get_nameid
()
in
list
(
self
.
component_nameids
.
keys
()
)
:
return
True
return
True
return
False
return
False
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment