Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
genotoul-bioinfo
jflow
Commits
5c5c398e
Commit
5c5c398e
authored
Apr 16, 2014
by
Jerome Mariette
Browse files
make a double execution only when dynamic components are present
parent
23bdf724
Changes
1
Hide whitespace changes
Inline
Side-by-side
src/jflow/workflow.py
View file @
5c5c398e
...
...
@@ -75,10 +75,12 @@ class Workflow(threading.Thread):
self
.
start_time
=
None
self
.
end_time
=
None
self
.
step
=
None
self
.
dynamic_component_present
=
False
self
.
function
=
function
self
.
parameters_section
=
parameters_section
self
.
comp_pckg
=
self
.
_import_components
()
self
.
engine_arguments
=
''
self
.
component_nameids_is_init
=
False
self
.
component_nameids
=
{}
# try to parse engine arguments
try
:
...
...
@@ -101,56 +103,6 @@ class Workflow(threading.Thread):
os
.
makedirs
(
self
.
directory
,
0751
)
self
.
_serialize
()
def
init_component_nameids
(
self
):
logging
.
getLogger
(
"Workflow.init_component_nameids"
).
debug
(
"Initializing components names ids"
)
logging
.
getLogger
(
"Workflow.init_component_nameids"
).
debug
(
"Name ids in the array : "
+
str
(
self
.
component_nameids
.
keys
()))
workflow_command_lines
=
inspect
.
getsourcelines
(
getattr
(
self
,
self
.
function
))[
0
]
for
i
in
range
(
0
,
len
(
workflow_command_lines
)):
line
=
workflow_command_lines
[
i
]
logging
.
getLogger
(
"Workflow.init_component_nameids"
).
debug
(
"Line : "
+
line
)
if
line
.
find
(
"self.add_component"
)
!=
-
1
and
not
re
.
search
(
'^\s*\#'
,
line
)
:
while
re
.
search
(
'
\\
\s*$'
,
line
)
or
re
.
search
(
',\s*$'
,
line
):
# While line ends with backslash
# Concatenate next line
i
+=
1
next_line
=
workflow_command_lines
[
i
]
if
re
.
search
(
'^\s*\#'
,
next_line
)
:
nex_line
=
"
\\
"
if
re
.
search
(
'
\\
\s*$'
,
line
):
line
=
re
.
search
(
'(.+)
\\
\s*$'
,
line
).
groups
()[
0
]
+
next_line
elif
re
.
search
(
',\s*$'
,
line
):
line
=
re
.
search
(
'(.+),\s*$'
,
line
).
groups
()[
0
]
+
next_line
component_prefix
=
"default"
component_name
=
line
.
strip
().
split
(
"self.add_component"
)[
1
][
1
:
-
1
].
split
(
","
)[
0
][
1
:
-
1
]
arguments
=
re
.
search
(
"self.add_component\s*\((.+)\)"
,
line
).
groups
()[
0
]
named_prefix
=
re
.
search
(
"component_prefix\s*=\s*([^,]+)"
,
arguments
)
# Component prefix is in named argument
if
named_prefix
:
component_prefix
=
named_prefix
.
groups
()[
0
].
strip
().
strip
(
'"'
)
# Component prefix is missing or in unamed argument
else
:
without_named_args
=
re
.
sub
(
",?\s*\S+\s*=\s*\S+"
,
""
,
arguments
)
# Discard named arguments
fields
=
without_named_args
.
strip
(
","
).
split
(
","
)
# Group brackets arguments
args_split_clean
=
[]
current_idx
=
0
current_arg
=
""
while
current_idx
<
len
(
fields
)
:
current_arg
=
current_arg
+
fields
[
current_idx
]
+
','
open_brackets
=
current_arg
.
count
(
"["
)
close_brackets
=
current_arg
.
count
(
"]"
)
if
open_brackets
==
close_brackets
:
args_split_clean
.
append
(
current_arg
[:
-
1
]
)
current_arg
=
""
current_idx
+=
1
# Find component prefix
if
len
(
args_split_clean
)
>=
4
:
component_prefix
=
args_split_clean
[
3
].
strip
().
strip
(
'"'
)
logging
.
getLogger
(
"Workflow.init_component_nameids"
).
debug
(
"Component added : "
+
component_name
+
"."
+
component_prefix
)
if
self
.
_component_is_duplicated
(
component_name
+
"."
+
component_prefix
):
raise
ValueError
(
"Component "
+
component_name
+
" with prefix "
+
component_prefix
+
" already exist in this pipeline!"
)
self
.
component_nameids
[
component_name
+
"."
+
component_prefix
]
=
None
@
staticmethod
def
config_parser
(
arg_lines
):
for
arg
in
arg_lines
:
...
...
@@ -206,14 +158,36 @@ class Workflow(threading.Thread):
cmpt_object
.
prefix
=
component_prefix
if
kwargs
:
cmpt_object
.
define_parameters
(
**
kwargs
)
else
:
cmpt_object
.
define_parameters
(
*
args
)
# add the component
self
.
components
.
append
(
cmpt_object
)
# if this one require a dynamic pipeline, execute the first part
# there is a dynamic component
if
cmpt_object
.
is_dynamic
():
self
.
_execute_weaver
()
# update outputs
for
output
in
cmpt_object
.
get_dynamic_outputs
():
output
.
update
()
self
.
dynamic_component_present
=
True
# if already init, add the component to the list and check if weaver should be executed
if
self
.
component_nameids_is_init
:
# add the component
self
.
components
.
append
(
cmpt_object
)
self
.
_execute_weaver
()
# update outputs
for
output
in
cmpt_object
.
get_dynamic_outputs
():
output
.
update
()
else
:
if
self
.
_component_is_duplicated
(
cmpt_object
):
raise
ValueError
(
"Component "
+
cmpt_object
.
__class__
.
__name__
+
" with prefix "
+
cmpt_object
.
prefix
+
" already exist in this pipeline!"
)
self
.
component_nameids
[
cmpt_object
.
get_nameid
()]
=
None
self
.
components
=
[]
else
:
if
self
.
component_nameids_is_init
:
# add the component
self
.
components
.
append
(
cmpt_object
)
elif
not
self
.
component_nameids_is_init
and
not
self
.
dynamic_component_present
:
self
.
components
.
append
(
cmpt_object
)
else
:
if
self
.
_component_is_duplicated
(
cmpt_object
):
raise
ValueError
(
"Component "
+
cmpt_object
.
__class__
.
__name__
+
" with prefix "
+
cmpt_object
.
prefix
+
" already exist in this pipeline!"
)
self
.
component_nameids
[
cmpt_object
.
get_nameid
()]
=
None
return
cmpt_object
else
:
raise
ImportError
(
component_name
+
" component cannot be loaded, available components are: {0}"
.
format
(
", "
.
join
(
self
.
comp_pckg
.
keys
())))
...
...
@@ -249,19 +223,34 @@ class Workflow(threading.Thread):
"""
# if this is the first time the workflow run
if
self
.
step
==
None
:
self
.
init_component_nameids
()
self
.
start_time
=
time
.
time
()
self
.
step
=
0
self
.
status
=
self
.
STATUS_STARTED
self
.
end_time
=
None
self
.
end_time
=
None
self
.
_serialize
()
# if pre_processing has not been done yet
if
self
.
step
==
0
:
self
.
pre_process
()
self
.
step
=
1
self
.
_serialize
()
# if running workflow has not been done yet
# if
collecting components and
running workflow has not been done yet
if
self
.
step
==
1
:
try
:
getattr
(
self
,
self
.
function
)()
except
SystemExit
,
e
:
self
.
status
=
self
.
STATUS_FAILED
self
.
end_time
=
time
.
time
()
self
.
_serialize
()
raise
self
.
component_nameids_is_init
=
True
if
self
.
dynamic_component_present
:
self
.
step
=
2
else
:
self
.
_execute_weaver
()
self
.
step
=
3
self
.
_serialize
()
# if the workflow was a dynamic one
if
self
.
step
==
2
:
try
:
getattr
(
self
,
self
.
function
)()
except
SystemExit
,
e
:
...
...
@@ -271,10 +260,10 @@ class Workflow(threading.Thread):
raise
if
len
(
self
.
components
)
>
0
:
self
.
_execute_weaver
()
self
.
step
=
2
self
.
step
=
3
self
.
_serialize
()
# if post processing has ne been done yet
if
self
.
step
==
2
:
if
self
.
step
==
3
:
self
.
post_process
()
if
self
.
status
==
self
.
STATUS_STARTED
:
self
.
status
=
self
.
STATUS_COMPLETED
self
.
end_time
=
time
.
time
()
...
...
@@ -564,8 +553,8 @@ class Workflow(threading.Thread):
pickle
.
dump
(
self
,
workflow_dump
)
workflow_dump
.
close
()
def
_component_is_duplicated
(
self
,
component
_nameid
):
if
component_nameid
in
self
.
component_nameids
.
keys
():
def
_component_is_duplicated
(
self
,
component
):
if
component
.
get
_nameid
()
in
self
.
component_nameids
.
keys
():
return
True
return
False
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment