Commit 0c65527d authored by Damien Leroux's avatar Damien Leroux
Browse files

Implemented temporary registry for concurrent identical tasks (so that two...

Implemented temporary registry for concurrent identical tasks (so that two non-mem-cached disk-writers won't interfere).
parent ebd60d17
#ifndef _SPEL_CACHE_REGISTRY_H_
#define _SPEL_CACHE_REGISTRY_H_
#include <mutex>
#include <unordered_map>
template <typename ValueType, typename... AllArgs>
struct computation_registry {
typedef ValueType value_type;
template <typename... Args> struct registry_impl;
template <typename Arg0>
struct registry_impl<Arg0> {
std::unordered_map<Arg0, value_type> m_registry;
value_type& get_(const Arg0& arg)
{
/*MSG_DEBUG("size=" << size() << " finding " << arg << "... " << m_registry[arg]);*/
return m_registry[arg];
}
bool remove_(const Arg0& car)
{
m_registry.erase(car);
return m_registry.size() == 0;
}
value_type* find_(const Arg0& car)
{
auto it = m_registry.find(car);
if (it == m_registry.end()) {
return NULL;
}
return &it->second;
}
size_t size() const { return m_registry.size(); }
};
template <typename Arg0, typename... Args>
struct registry_impl<Arg0, Args...> {
std::unordered_map<Arg0, registry_impl<Args...>> m_registry;
value_type& get_(const Arg0& car, const Args&... cdr)
{
/*MSG_DEBUG("size=" << size() << " finding " << car << "...");*/
return m_registry[car].get_(cdr...);
}
bool remove_(const Arg0& car, const Args&... cdr)
{
if (m_registry[car].remove_(cdr...)) {
m_registry.erase(car);
}
return m_registry.size() == 0;
}
value_type* find_(const Arg0& car, const Args&... cdr)
{
auto it = m_registry.find(car);
if (it == m_registry.end()) {
return NULL;
}
return it->second.find_(cdr...);
}
size_t size() const
{
size_t accum = 0;
for (auto& kv: m_registry) {
accum += kv.second.size();
}
return accum;
}
};
registry_impl<AllArgs...> m_registry;
template <typename... Args>
value_type& get(const Args&... args)
{
return m_registry.get_(args...);
}
template <typename... Args>
void remove(const Args&... args)
{
m_registry.remove_(args...);
}
template <typename... Args>
value_type* find(const Args&... args)
{
return m_registry.find_(args...);
}
size_t size() const { return m_registry.size(); }
};
#endif
......@@ -23,10 +23,16 @@ enum CachingPolicy : int { Oneshot = 0, Mem = 1, Disk = 2, Sync = 4 };
constexpr CachingPolicy operator | (CachingPolicy c1, CachingPolicy c2) { return CachingPolicy(int(c1) | int(c2)); }
constexpr CachingPolicy operator & (CachingPolicy c1, CachingPolicy c2) { return CachingPolicy(int(c1) & int(c2)); }
template <typename X> struct clean_type { typedef typename std::remove_reference<X>::type type; };
template <typename X> struct clean_type<const X&> { typedef X type; };
template <typename X> struct clean_type<X&> { typedef X type; };
template <typename X> struct clean_type<const X> { typedef X type; };
#include "error.h"
#include "input.h"
#include "cache/md5.h"
#include "cache/file.h"
#include "cache/registry.h"
extern "C" {
/*#include <dlfcn.h>*/
......@@ -84,11 +90,6 @@ get_func_name(Ret (&f) (Args...))
/* forward */ struct md5_digest;
template <typename X> struct clean_type { typedef typename std::remove_reference<X>::type type; };
template <typename X> struct clean_type<const X&> { typedef X type; };
template <typename X> struct clean_type<X&> { typedef X type; };
template <typename X> struct clean_type<const X> { typedef X type; };
static inline std::string& cache_directory() { return active_settings->work_directory; }
......@@ -198,6 +199,11 @@ bool operator == (const value<T>& v1, const value<T>& v2) { return v1.equal(v2);
template <typename T>
bool operator != (const value<T>& v1, const value<T>& v2) { return !v1.equal(v2); }
template <typename VT>
using clean_value_type = value<typename clean_type<VT>::type>;
namespace std {
template <typename T>
struct hash<value<T>> {
......@@ -386,6 +392,46 @@ template <typename ValueType>
/*: unique_value<labelled_matrix<M, R, C>> {};*/
template <typename Ret, typename... Args>
computation_registry<value<Ret>, Ret (*) (Args...), value<typename clean_type<Args>::type>...>&
__get_registry()
{
static computation_registry<value<Ret>, Ret (*) (Args...), value<typename clean_type<Args>::type>...> _reg_;
/*MSG_DEBUG("Registry at " << (&_reg_));*/
return _reg_;
}
template <typename Ret, typename... Args>
computation_registry<value<Ret>, Ret (*) (Args...), value<typename clean_type<Args>::type>...>&
__get_in_progress_registry()
{
static computation_registry<value<Ret>, Ret (*) (Args...), value<typename clean_type<Args>::type>...> _reg_;
/*MSG_DEBUG("Registry at " << (&_reg_));*/
return _reg_;
}
template <typename Ret, typename... Args>
std::mutex& __get_in_progress_mutex() { static std::mutex _; return _; }
template <typename Ret, typename... Args>
void unregister_task_in_progress(Ret (*f) (Args...), const clean_value_type<Args>&... x)
{
__get_in_progress_mutex<Ret, Args...>().lock();
__get_registry<Ret, Args...>().remove(f, x...);
__get_in_progress_mutex<Ret, Args...>().unlock();
}
template <typename Ret, typename... Args>
value<Ret> register_task_in_progress(Ret (&f) (Args...), const clean_value_type<Args>&... x, value<Ret>& v)
{
__get_in_progress_mutex<Ret, Args...>().lock();
__get_registry<Ret, Args...>().get(&f, x...) = v;
__get_in_progress_mutex<Ret, Args...>().unlock();
}
template <typename Ret, typename... Args>
struct async_computation<Ret(Args...)> {
......@@ -398,9 +444,7 @@ template <typename Ret, typename... Args>
const value<typename clean_type<Args>::type>&... args)
: dependencies(args...)
, m_storage()
, m_future(active_settings->enqueue(_Sync, [=] (Args... args) { chrono_trace _(get_func_name(func)); return func(args...); }, *args...))
/*, m_future(std::async(std::launch::async, func, *args...))*/
/*, m_future(std::async(std::launch::deferred, func, *args...))*/
, m_future(active_settings->enqueue(_Sync, [=] (Args... args) { chrono_trace _(get_func_name(func)); Ret ret = func(args...); unregister_task_in_progress(func, args...); return ret; }, *args...))
, mutex()
{}
......@@ -713,63 +757,6 @@ template <typename ValueType, typename Arg0, typename... OtherArgs>
};
#endif
template <typename ValueType, typename... AllArgs>
struct computation_registry {
typedef ValueType value_type;
template <typename... Args> struct registry_impl;
template <typename Arg0>
struct registry_impl<Arg0> {
std::unordered_map<Arg0, value_type> m_registry;
value_type& get_(const Arg0& arg)
{
/*MSG_DEBUG("size=" << size() << " finding " << arg << "... " << m_registry[arg]);*/
return m_registry[arg];
}
size_t size() const { return m_registry.size(); }
};
template <typename Arg0, typename... Args>
struct registry_impl<Arg0, Args...> {
std::unordered_map<Arg0, registry_impl<Args...>> m_registry;
value_type& get_(const Arg0& car, const Args&... cdr)
{
/*MSG_DEBUG("size=" << size() << " finding " << car << "...");*/
return m_registry[car].get_(cdr...);
}
size_t size() const
{
size_t accum = 0;
for (auto& kv: m_registry) {
accum += kv.second.size();
}
return accum;
}
};
registry_impl<AllArgs...> m_registry;
template <typename... Args>
value_type& get(const Args&... args)
{
return m_registry.get_(args...);
}
size_t size() const { return m_registry.size(); }
};
template <typename Ret, typename... Args>
computation_registry<value<Ret>, Ret (*) (Args...), value<typename clean_type<Args>::type>...>&
__get_registry()
{
static computation_registry<value<Ret>, Ret (*) (Args...), value<typename clean_type<Args>::type>...> _reg_;
/*MSG_DEBUG("Registry at " << (&_reg_));*/
return _reg_;
}
#if 0
static inline
......@@ -787,9 +774,6 @@ std::mutex& __get_lock(void* fptr)
}
#endif
template <typename VT>
using clean_value_type = value<typename clean_type<VT>::type>;
template <typename Ret, typename... Args>
struct with_disk_cache_traits {
......@@ -821,6 +805,13 @@ struct with_mem_cache_traits {
{
static std::mutex _;
std::unique_lock<std::mutex> lock_guard(_);
value<Ret>* ret_in_progress = __get_in_progress_registry<Ret, Args...>().find(&f, x...);
if (ret_in_progress) {
return *ret_in_progress;
}
/*MSG_DEBUG("new value with mem cache");*/
return_type ret = __get_registry<Ret, Args...>().get(&f, x...);
if (!ret.valid()) {
......@@ -839,6 +830,12 @@ struct without_mem_cache_traits {
return_type
create(CachingPolicy _Sync, Ret (&f) (Args...), const clean_value_type<Args>&... x)
{
value<Ret>* ret_in_progress = __get_in_progress_registry<Ret, Args...>().find(&f, x...);
if (ret_in_progress) {
return *ret_in_progress;
}
/*MSG_DEBUG("new value without mem cache");*/
return_type ret = new _Maker(_Sync, f, x...);
return ret;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment