Commit fd7f0642 authored by Frédéric Escudié's avatar Frédéric Escudié
Browse files

Fix bug with concurrent access to new workflow ID.

parent 8582e3be
#
# Copyright (C) 2015 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
__license__ = 'GNU General Public License'
__version__ = '0.1.0'
import os
import re
import time
import json
import random
def priority_compare_key(cmp_func):
"""
@summary: Converts a cmp= function into a key= function (python 2.x to 3.x).
"""
class K:
def __init__(self, obj, *args):
self.obj = obj
def __lt__(self, other):
return cmp_func(self.obj, other.obj) < 0
def __gt__(self, other):
return cmp_func(self.obj, other.obj) > 0
def __eq__(self, other):
return cmp_func(self.obj, other.obj) == 0
def __le__(self, other):
return cmp_func(self.obj, other.obj) <= 0
def __ge__(self, other):
return cmp_func(self.obj, other.obj) >= 0
def __ne__(self, other):
return cmp_func(self.obj, other.obj) != 0
return K
def priority_compare( a, b ):
"""
@summary: The comparison rules to establish the priorities between two
requests.
"""
compare = -1 # By default a is the first
if b['timestamp'] < a['timestamp']:
compare = 1
elif b['timestamp'] == a['timestamp']:
if b['PID'] < a['PID']:
compare = 1
elif b['PID'] == a['PID']:
if b['random'] < a['random']:
compare = 1
return compare
def get_priority( requests, evaluated_request ):
"""
@summary: Returns the access priority for the evaluated request.
@param requests: [list] The list of all access requests.
@param evaluated_request: [dict] The evaluated access request.
@return: [int] The access priority for the evaluated request. the maximum
priority is 1.
"""
priority = None
requests.sort( key=priority_compare_key(priority_compare) )
for idx, current_request in enumerate(requests):
if current_request['timestamp'] == evaluated_request['timestamp'] and current_request['PID'] == evaluated_request['PID'] and current_request['random'] == evaluated_request['random']:
if priority is None:
priority = idx+1
else:
raise Exception("Two jobs try to access in write mode at the same ressource.")
return priority
def get_requests( request_dir, shared_file ):
"""
@summary: Returns the list of access requests on shared file.
@param request_dir: [str] The path to the directory where the temporary
files (request and lock) are stored.
@param shared_file: [str] The path to the file shared in writing mode
between the concurrent processes.
@return: [list] The list of access requests.
"""
all_requests = list()
pattern = os.path.basename(shared_file) + "_accessRequest_[^\|]+\|\d+\|\d+"
for request_file in os.listdir(request_dir):
if re.match(pattern, request_file):
try:
fh_request = open(os.path.join(request_dir, request_file))
all_requests.append( json.load(fh_request) )
fh_request.close()
except:
pass
return all_requests
def stopRetry( priorities, max_stable_priorities ):
"""
@summmary: Checks the evolution of the priority along the access retries and
returns true if the priority does not change during last N
retries. This is to prevent deadlock.
@param priorities: [list] The priorities of each access retries.
@param max_stable_priorities: [int] The maximum number of retry with the
same priority.
@return: [bool] Returns true if the priority does not change.
"""
if len(priorities) < max_stable_priorities:
return False
stop_retry = True
for idx in range(max_stable_priorities-1):
# With max_stable_priorites = 4:
# idx = 0: priorities[-1] != priorities[-2]
# idx = 1: priorities[-2] != priorities[-3]
# idx = 2: priorities[-3] != priorities[-4]
if priorities[-(1+idx)] != priorities[-(2+idx)]:
stop_retry = False
return stop_retry
def exec_on_shared( process_fct, shared_file, tmp_dir="/tmp", time_between_retry=0.7, max_stable_priorities=10 ):
"""
@summmary: Manages concurrent access in writing mode between several
processes on a shared file.
@param process_fct: [func] The function executed when the shared ressource
is available for the current job.
@param shared_file: [str] The path to the file shared in writing mode
between the concurrent processes.
@param tmp_dir: [str] The path to the directory where the temporary files
(request and lock) are stored.
@param time_between_retry: [float] the number of seconds between each
retry.
@param max_stable_priorities: [int] The number of retry with the same
priority for consider situation as a
deadlock.
@return: The process_fct return or None if the process_fct does not have
return.
@note: Bug if 1 node with a late timestamp executes the reservation, the
sleep and the priority check between the priority check and the lock
creation of an other node with a valid timestamp.
"""
fct_return = None
retry = True
lock_file = os.path.join( tmp_dir, os.path.basename(shared_file) + "_lock" )
# Get request info
current_request = {
'timestamp': time.time(),
'PID': os.getpid(),
'random': random.randint(1, 10000000000000000)
}
# Set request file
request_filename = "{}_accessRequest_{}|{}|{}".format(os.path.basename(shared_file), current_request['timestamp'], current_request['PID'], current_request['random'])
current_request_file = os.path.join( tmp_dir, request_filename )
try:
# Write request file
fh_request = open(current_request_file, "w")
fh_request.write( json.dumps(current_request) )
fh_request.close()
# Wait
time.sleep( time_between_retry )
# Try to access at the shared ressource
priorities = list()
while retry:
access_requests = get_requests( tmp_dir, shared_file ) # Retrieve all access requests
priorities.append( get_priority(access_requests, current_request) ) # Return the position of the current request in requests execution order
if priorities[-1] == 1 and not os.path.exists(lock_file): # lock_file prevents error with jobs launched after the job with access to the ressource but on node with a bad timestamp
open(lock_file, "w").close()
# Process
fct_return = process_fct()
retry = False
else:
if stopRetry(priorities, max_stable_priorities):
raise Exception("Dead lock to access at the ressource.")
time.sleep( time_between_retry )
finally:
# Delete current access request
if os.path.exists(current_request_file):
os.remove( current_request_file )
if os.path.exists(lock_file):
os.remove( lock_file )
return fct_return
\ No newline at end of file
......@@ -27,17 +27,15 @@ import threading
import logging
import jflow.utils as utils
import jflow.concurrent_access as concurrent
from jflow.config_reader import JFlowConfigReader
class WorkflowsManager(object):
IDS_FILE_NAME = "jflowIDs.txt"
WORKFLOWS_DUMP_FILE_NAME = ".workflows.dump"
WF_DIRECTORY_PREFIX = "wf"
lock_ids_file = threading.Lock()
lock_workflows_file = threading.Lock()
def __init__(self):
self.config_reader = JFlowConfigReader()
if not os.path.isdir(self.get_output_directory()):
......@@ -46,23 +44,23 @@ class WorkflowsManager(object):
self.ids_file = os.path.join(self.get_output_directory(), self.IDS_FILE_NAME)
def _dump_workflows(self, workflows):
WorkflowsManager.lock_workflows_file.acquire()
# first load the existing workflows
try:
wdfh = open(self.dump_file, "rb")
workflows_dump = pickle.load(wdfh)
def dump_func():
# first load the existing workflows
try:
wdfh = open(self.dump_file, "rb")
workflows_dump = pickle.load(wdfh)
wdfh.close()
except:
workflows_dump = {}
# then add the new ones
for workflow in workflows:
workflows_dump[utils.get_nb_string(workflow.id)] = {"dump_path": workflow.dump_path,
"object": workflow.minimize()}
# and save them
wdfh = open(self.dump_file, "wb")
pickle.dump(workflows_dump, wdfh)
wdfh.close()
except:
workflows_dump = {}
# then add the new ones
for workflow in workflows:
workflows_dump[utils.get_nb_string(workflow.id)] = {"dump_path": workflow.dump_path,
"object": workflow.minimize()}
# and save them
wdfh = open(self.dump_file, "wb")
pickle.dump(workflows_dump, wdfh)
wdfh.close()
WorkflowsManager.lock_workflows_file.release()
concurrent.exec_on_shared( dump_func, self.dump_file, self.config_reader.get_tmp_directory() )
def get_available_workflows(self, function="process"):
if function.__class__.__name__ == "str":
......@@ -115,34 +113,34 @@ class WorkflowsManager(object):
# Add the workflow dump path to the workflows dump
self._dump_workflows([workflow])
return workflow
def delete_workflow(self, workflow_id):
from jflow.workflow import Workflow
WorkflowsManager.lock_workflows_file.acquire()
try:
awfh = open(self.dump_file, "rb")
all_workflows_dump = pickle.load(awfh)
def delete_func():
try:
awfh = open(self.dump_file, "rb")
all_workflows_dump = pickle.load(awfh)
awfh.close()
except:
all_workflows_dump = {}
rworkflow_id = utils.get_nb_string(workflow_id)
try:
workflow_dump = open(all_workflows_dump[rworkflow_id]["dump_path"], "rb")
workflow = pickle.load(workflow_dump)
workflow_dump.close()
# if workflow is not in a running status
if workflow.get_status() in [Workflow.STATUS_COMPLETED, Workflow.STATUS_FAILED, Workflow.STATUS_ABORTED]:
workflow.delete()
del all_workflows_dump[rworkflow_id]
except:
logging.getLogger("jflow").debug("Workflow #" + rworkflow_id + " connot be retrieved in the available workflows!")
raise Exception("Workflow #" + rworkflow_id + " connot be retrieved in the available workflows!")
# and save them
awfh = open(self.dump_file, "wb")
pickle.dump(all_workflows_dump, awfh)
awfh.close()
except:
all_workflows_dump = {}
rworkflow_id = utils.get_nb_string(workflow_id)
try:
workflow_dump = open(all_workflows_dump[rworkflow_id]["dump_path"], "rb")
workflow = pickle.load(workflow_dump)
workflow_dump.close()
# if workflow is not in a running status
if workflow.get_status() in [Workflow.STATUS_COMPLETED, Workflow.STATUS_FAILED, Workflow.STATUS_ABORTED]:
workflow.delete()
del all_workflows_dump[rworkflow_id]
except:
logging.getLogger("jflow").debug("Workflow #" + rworkflow_id + " connot be retrieved in the available workflows!")
raise Exception("Workflow #" + rworkflow_id + " connot be retrieved in the available workflows!")
# and save them
awfh = open(self.dump_file, "wb")
pickle.dump(all_workflows_dump, awfh)
awfh.close()
WorkflowsManager.lock_workflows_file.release()
concurrent.exec_on_shared( delete_func, self.dump_file, self.config_reader.get_tmp_directory() )
def get_workflow_errors(self, workflow_id):
workflow = self.get_workflow(workflow_id)
return workflow.get_errors()
......@@ -209,11 +207,9 @@ class WorkflowsManager(object):
def get_workflow_directory(self, wname, wid):
return os.path.join(os.path.join(self.config_reader.get_work_directory(), wname),
self.WF_DIRECTORY_PREFIX + utils.get_nb_string(wid))
def get_next_id(self):
WorkflowsManager.lock_ids_file.acquire()
cid = 1
try:
def next_id_func():
if os.path.isfile(self.ids_file):
ifh = open(self.ids_file)
cid = int(ifh.readline().strip())
......@@ -226,6 +222,5 @@ class WorkflowsManager(object):
ifh = open(self.ids_file, "w")
ifh.write("1")
ifh.close()
finally:
WorkflowsManager.lock_ids_file.release()
return cid
return 1
return concurrent.exec_on_shared( next_id_func, self.ids_file, self.config_reader.get_tmp_directory() )
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment