Commit 18cb7dc1 authored by Frédéric Escudié's avatar Frédéric Escudié
Browse files

Add metadata in concurrent access request.

parent e6be3479
......@@ -89,7 +89,7 @@ def get_requests( request_dir, shared_file ):
@return: [list] The list of access requests.
"""
all_requests = list()
pattern = os.path.basename(shared_file) + "_accessRequest_[^\|]+\|\d+\|\d+"
pattern = os.path.basename(shared_file) + "_accessRequest_[^\-]+\-\d+\-\d+"
for request_file in os.listdir(request_dir):
if re.match(pattern, request_file):
try:
......@@ -122,7 +122,7 @@ def stopRetry( priorities, max_stable_priorities ):
stop_retry = False
return stop_retry
def exec_on_shared( process_fct, shared_file, tmp_dir="/tmp", time_between_retry=0.7, max_stable_priorities=100 ):
def exec_on_shared( process_fct, shared_file, tmp_dir="/tmp", time_between_retry=0.7, max_stable_priorities=100, metadata=None ):
"""
@summmary: Manages concurrent access in writing mode between several
processes on a shared file.
......@@ -132,11 +132,12 @@ def exec_on_shared( process_fct, shared_file, tmp_dir="/tmp", time_between_retry
between the concurrent processes.
@param tmp_dir: [str] The path to the directory where the temporary files
(request and lock) are stored.
@param time_between_retry: [float] the number of seconds between each
@param time_between_retry: [float] the number of seconds between each
retry.
@param max_stable_priorities: [int] The number of retry with the same
priority for consider situation as a
deadlock.
@param metadata: [dict] The metadata added in access request file.
@return: The process_fct return or None if the process_fct does not have
return.
@note: Bug if 1 node with a late timestamp executes the reservation, the
......@@ -151,11 +152,12 @@ def exec_on_shared( process_fct, shared_file, tmp_dir="/tmp", time_between_retry
current_request = {
'timestamp': time.time(),
'PID': os.getpid(),
'random': random.randint(1, 10000000000000000)
'random': random.randint(1, 10000000000000000),
'metadata': metadata
}
# Set request file
request_filename = "{}_accessRequest_{}|{}|{}".format(os.path.basename(shared_file), current_request['timestamp'], current_request['PID'], current_request['random'])
request_filename = "{}_accessRequest_{}-{}-{}".format(os.path.basename(shared_file), current_request['timestamp'], current_request['PID'], current_request['random'])
current_request_file = os.path.join( tmp_dir, request_filename )
try:
......
......@@ -60,8 +60,9 @@ class WorkflowsManager(object):
wdfh = open(self.dump_file, "wb")
pickle.dump(workflows_dump, wdfh)
wdfh.close()
concurrent.exec_on_shared( dump_func, self.dump_file, self.config_reader.get_tmp_directory(), 1, 200 )
workflows_ids = [wf.id for wf in workflows]
concurrent.exec_on_shared( dump_func, self.dump_file, self.config_reader.get_tmp_directory(), 1, 200, {"action": "Add wf", "wf_id": workflows_ids} )
def get_available_workflows(self, function="process"):
if function.__class__.__name__ == "str":
functions = [function]
......@@ -139,7 +140,7 @@ class WorkflowsManager(object):
awfh = open(self.dump_file, "wb")
pickle.dump(all_workflows_dump, awfh)
awfh.close()
concurrent.exec_on_shared( delete_func, self.dump_file, self.config_reader.get_tmp_directory(), 1, 200 )
concurrent.exec_on_shared( delete_func, self.dump_file, self.config_reader.get_tmp_directory(), 1, 200, {"action": "Delete wf", "wf_id": str(workflow_id)} )
def get_workflow_errors(self, workflow_id):
workflow = self.get_workflow(workflow_id)
......@@ -223,4 +224,4 @@ class WorkflowsManager(object):
ifh.write("1")
ifh.close()
return 1
return concurrent.exec_on_shared( next_id_func, self.ids_file, self.config_reader.get_tmp_directory() )
return concurrent.exec_on_shared( next_id_func, self.ids_file, self.config_reader.get_tmp_directory(), metadata={"action": "Get wf ID"} )
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment