34 #include "workloadmanager/WorkloadManager.hxx"
35 #include "workloadmanager/DefaultAlgorithm.hxx"
49 #define usleep(A) _sleep(A/1000)
50 #if !defined(S_ISCHR) || !defined(S_ISREG)
53 # define S_IFMT _S_IFMT
54 # define S_IFCHR _S_IFCHR
55 # define S_IFREG _S_IFREG
58 # define S_IFMT __S_IFMT
59 # define S_IFCHR __S_IFCHR
60 # define S_IFREG __S_IFREG
64 # define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
65 # define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
72 using YACS::BASES::Mutex;
73 using YACS::BASES::Thread;
74 using YACS::BASES::Semaphore;
82 Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
118 DEBTRACE(
"Executor::RunW debug: " << debug <<
" fromScratch: " << fromScratch);
131 vector<Task *> tasks;
132 vector<Task *>::iterator iter;
153 for(iter=tasks.begin();iter!=tasks.end();iter++)
237 DEBTRACE(
"Executor::RunB debug: "<< graph->
getName() <<
" "<< debug<<
" fromScratch: "<<fromScratch);
253 string tracefile =
"traceExec_";
255 _trace.open(tracefile.c_str());
256 _start = std::chrono::steady_clock::now();
281 vector<Task *>::iterator iter;
296 std::vector<Task *> tasks = graph->
getNextTasks(isMore);
329 std::cerr <<
"Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
332 if(problemCount > 25)
356 DEBTRACE(
"stop requested: End soon");
406 if (dumpRequested && xmlFile.empty())
407 throw YACS::Exception(
"dump on error requested and no filename given for dump");
432 DEBTRACE(
"Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
450 DEBTRACE(
"Executor::resumeCurrentBreakPoint()");
473 DEBTRACE(
"Graph Execution finished or stopped !");
493 DEBTRACE(
"Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
509 DEBTRACE(
"Executor::getTasksToLoad()");
510 list<string> listOfNodesToLoad;
511 listOfNodesToLoad.clear();
534 return listOfNodesToLoad;
547 DEBTRACE(
"Executor::setStepsToExecute(std::list<std::string> listToExecute)");
549 vector<Task *>::iterator iter;
550 vector<Task *> restrictedTasks;
562 if (find(listToExecute.begin(), listToExecute.end(), readyNode)
563 != listToExecute.end())
565 restrictedTasks.push_back(*iter);
566 DEBTRACE(
"node to execute " << readyNode);
570 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
589 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
596 DEBTRACE(
"selected node to execute " << readyNode);
680 DEBTRACE(
"Executor::saveState() in " << xmlFile);
691 std::cerr <<
ex.what() << std::endl;
709 if (stat(filename, &buf) != 0)
711 if (!S_ISREG(buf.st_mode))
731 std::ofstream g(
"titi");
734 const char displayScript[]=
"display.sh";
736 system(
"sh display.sh");
738 system(
"dot -Tpng titi|display -delay 5");
754 DEBTRACE(
"Executor::checkBreakPoints()");
755 vector<Task *>::iterator iter;
756 bool endRequested =
false;
798 if (stop)
DEBTRACE(
"wake up from waitResume");
824 DEBTRACE(
"wake up from waitResume");
828 DEBTRACE(
"endRequested: " << endRequested);
855 DEBTRACE(
"Executor::loadTask(Task *task)");
872 std::cerr <<
ex.what() << std::endl;
882 std::cerr <<
"Load failed" << std::endl;
901 std::vector<Thread> ths(tasks.size());
902 std::size_t ithread(0);
903 for(std::vector<Task *>::const_iterator iter =
_tasks.begin(); iter !=
_tasks.end(); iter++, ithread++)
905 DEBTRACE(
"Executor::loadParallelTasks(Task *task)");
907 args->
task = (*iter);
912 for(ithread=0;ithread<tasks.size();ithread++)
924 for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
930 (*iter)->connectService();
934 (*iter)->connected();
939 std::cerr <<
ex.what() << std::endl;
942 (*iter)->disconnectService();
958 std::cerr <<
"Problem in connectService" << std::endl;
961 (*iter)->disconnectService();
978 std::set<Task*> coupledSet;
979 (*iter)->getCoupledTasks(coupledSet);
980 for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
983 if(
t == *iter)
continue;
987 t->disconnectService();
1007 for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
1024 DEBTRACE(
"Executor::launchTask(Task *task)");
1035 std::set<Task*>::iterator it = tmpSet.begin();
1036 std::string status=
"running";
1037 std::set<Task*> coupledSet;
1038 while( it != tmpSet.end() )
1044 for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1047 tmpSet.erase(*iter);
1049 if(status==
"running")
break;
1050 it = tmpSet.begin();
1053 if(status==
"toactivate")
1055 std::cerr <<
"WARNING: maybe you need more threads to run your schema (current value="<<
_maxThreads <<
")" << std::endl;
1056 std::cerr <<
"If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1084 DEBTRACE(
"Executor::sleepWhileNoEventsFromAnyRunningTask()");
1142 DEBTRACE(
"Executor::functionForTaskLoad(void *arg)");
1167 DEBTRACE(
"Executor::functionForTaskExecution(void *arg)");
1184 if(node!=0 && gfn!=0)
1197 std::cerr <<
"YACS Exception during execute" << std::endl;
1198 std::cerr <<
ex.what() << std::endl;
1200 string message =
"end execution ABORT, ";
1201 message +=
ex.what();
1207 std::cerr <<
"Execution has failed: unknown reason" << std::endl;
1215 DEBTRACE(
"task->disconnectService()");
1222 std::cerr <<
"disconnect has failed" << std::endl;
1235 std::lock_guard<std::mutex> alckCont(contC->
getLocker());
1262 std::cerr <<
"Error during notification" << std::endl;
1263 std::cerr <<
ex.what() << std::endl;
1269 std::cerr <<
"Notification failed" << std::endl;
1305 string containerName =
"---";
1307 containerName = cont->
getName();
1309 std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
1310 std::chrono::milliseconds millisec;
1311 millisec = std::chrono::duration_cast<std::chrono::milliseconds>(now -
_start);
1312 double elapse = double(millisec.count()) / 1000.0;
1315 _trace << elapse <<
" " << containerName <<
" " << placement <<
" " << nodeName <<
" " << message << endl;
1354 std::map<HomogeneousPoolContainer *, std::vector<Task *>,
HPCCompare >
m;
1355 for(
auto cur : tsks)
1362 m[
nullptr].push_back(cur);
1368 m[
nullptr].push_back(cur);
1371 m[contC].push_back(cur);
1374 std::vector<Task *> ret;
1378 const std::vector<Task *>& curtsks(it.second);
1383 std::uint32_t nbOfCandidates =
static_cast<std::uint32_t
>( curtsks.size() );
1384 std::uint32_t nbOfCandidatesToBeLaunched = std::min(nbOfCandidates,nbOfFreeSpace);
1385 DEBTRACE(
"nb threads running: " << nbThreadsRunning);
1387 DEBTRACE(
"nbOfFreeSpace: " << nbOfFreeSpace);
1388 DEBTRACE(
"nbOfCandidates: " << nbOfCandidates);
1389 DEBTRACE(
"nbOfCandidatesToBeLaunched: " << nbOfCandidatesToBeLaunched);
1390 ret.insert(ret.end(),curtsks.begin(),curtsks.begin() + nbOfCandidatesToBeLaunched);
1395 std::lock_guard<std::mutex> alckCont(curhpc->
getLocker());
1396 std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1398 std::vector<Task *>::const_iterator it2(curtsks.begin());
1399 for(std::size_t
i=0;
i<sz && it2!=curtsks.end();
i++,it2++)
1401 vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1402 ret.push_back(*it2);
1404 curhpc->
allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1414 std::string placement(
"---");
1434 std::ostringstream container_name;
1435 container_name << runInfo.type.name <<
"-" << runInfo.index;
1436 task->
imposeResource(runInfo.resource.name, container_name.str());
1444 std::cerr <<
ex.what() << std::endl;
1454 std::cerr <<
"Load failed" << std::endl;
1496 std::cerr <<
"Error during notification" << std::endl;
1497 std::cerr <<
ex.what() << std::endl;
1503 std::cerr <<
"Notification failed" << std::endl;
1527 if(elemNode !=
nullptr)
1545 Node *node(
dynamic_cast<Node *
>(task));
1547 if(node!=0 && gfn!=0)
1560 std::cerr <<
"YACS Exception during execute" << std::endl;
1561 std::cerr <<
ex.what() << std::endl;
1563 string message =
"end execution ABORT, ";
1564 message +=
ex.what();
1570 std::cerr <<
"Execution has failed: unknown reason" << std::endl;
1578 DEBTRACE(
"task->disconnectService()");
1585 std::cerr <<
"disconnect has failed" << std::endl;
1598 std::lock_guard<std::mutex> alckCont(contC->
getLocker());
1621 std::cerr <<
ex.what() << std::endl;
1624 (task)->disconnectService();
1640 std::cerr <<
"Problem in connectService" << std::endl;
1643 (task)->disconnectService();
1660 std::set<Task*> coupledSet;
1662 for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
1665 if(
t == task)
continue;
1669 t->disconnectService();
1690 DEBTRACE(
"Executor::runWlm debug: "<< graph->
getName() <<
" "<< debug<<
" fromScratch: "<<fromScratch);
1705 string tracefile =
"traceExec_";
1707 _trace.open(tracefile.c_str());
1708 _start = std::chrono::steady_clock::now();
1740 WorkloadManager::DefaultAlgorithm algo;
1741 WorkloadManager::WorkloadManager wlm(algo);
1747 DEBTRACE(
"--- executor main loop");
1753 std::vector<Task *> readyTasks=graph->
getNextTasks(isMore);
1756 for(
Task *
t : readyTasks)
1772 wlm.addTask(newTask);
1789 std::cerr <<
"Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
1792 if(problemCount > 25)
1817 DEBTRACE(
"stop requested: End soon");
1835 std::string str_value = graph->
getProperty(
"executor");
1836 if(str_value ==
"WorkloadManager"
1837 || str_value ==
"WORKLOADMANAGER"
1838 || str_value ==
"workloadmanager"
1839 || str_value ==
"WorkLoadManager")
1840 runWlm(graph, debug, fromScratch);
1842 RunB(graph, debug, fromScratch);
static int isfile(const char *filename)
#define YASSERT(val)
YASSERT macro is always defined, used like assert, but throw a YACS::Exception instead of abort.
Base class for all composed nodes.
void accept(Visitor *visitor)
virtual std::string getFullPlacementId(const Task *askingNode) const =0
virtual std::string getName() const
Base class for dispatcher in observer pattern.
virtual void dispatch(Node *object, const std::string &event)
static Dispatcher * getDispatcher()
Base class for all calculation nodes.
void setMaxNbOfThreads(int maxNbThreads)
std::uint32_t _maxNbThreads
void displayDot(Scheduler *graph)
Display the graph state as a dot display, public method.
std::list< std::string > _listOfTasksToLoad
virtual void sendEvent(const std::string &event)
emit notification to all observers registered with the dispatcher
YACS::ExecutionMode getCurrentExecMode()
YACS::BASES::Mutex _mutexForTrace
std::vector< Task * > _tasks
bool setStepsToExecute(std::list< std::string > listToExecute)
Define a subset of task to execute in step by step mode.
std::list< std::string > getTasksToLoad()
Get the list of tasks to load, to define a subset to execute in step by step mode.
bool _isWaitingEventsFromRunningTasks
bool _isRunningunderExternalControl
void failTask(Task *task, const std::string &message)
bool saveState(const std::string &xmlFile)
save the current state of execution in an xml file
void makeDatastreamConnections(Task *task)
YACS::ExecutionMode _execMode
std::set< Task * > _runningTasks
bool loadState()
not yet implemented
void launchTask(Task *task)
Execute a Task in a thread.
static size_t _threadStackSize
void resume(bool suspended)
void launchTasks(const std::vector< Task * > &tasks)
Execute a list of tasks possibly connected through datastream links.
void waitPause()
suspend pilot execution until Executor is in pause or waiting tasks completion mode.
bool _dumpOnErrorRequested
void setExecMode(YACS::ExecutionMode mode)
Dynamically set the current mode of execution.
static void * functionForTaskExecution(void *)
Function to perform execution of a task in a thread.
void loadParallelTasks(const std::vector< Task * > &tasks, const Executor *execInst)
bool checkBreakPoints()
Wait reactivation in modes Step By step or with BreakPoints.
YACS::BASES::Mutex _mutexForSchedulerUpdate
YACS::Event runTask(Task *task)
bool resumeCurrentBreakPoint()
wake up executor when in pause
void sleepWhileNoEventsFromAnyRunningTask()
wait until a running task ends
void _displayDot(Scheduler *graph)
Display the graph state as a dot display.
void RunA(Scheduler *graph, int debug=0, bool fromScratch=true)
Execute a graph waiting for completion.
void stopExecution()
stops the execution as soon as possible
std::string _dumpErrorFile
void setStopOnError(bool dumpRequested=false, std::string xmlFile="")
ask to stop execution on the first node found in error
std::vector< Task * > _tasksSave
YACS::BASES::Condition _condForNewTasksToPerform
std::list< std::string > _listOfBreakPoints
void RunB(Scheduler *graph, int debug=0, bool fromScratch=true)
Execute a graph with breakpoints or step by step.
void wakeUp()
must be used protected by _mutexForSchedulerUpdate!
void unsetStopOnError()
ask to do not stop execution on nodes found in error
YACS::BASES::Semaphore _semForMaxThreads
bool getDPLScopeSensitive() const
static std::string ComputePlacement(Task *zeTask)
void RunW(Scheduler *graph, int debug=0, bool fromScratch=true)
YACS::ExecutorState getExecutorState()
void endTask(Task *task, YACS::Event ev)
YACS::ExecutorState _executorState
void loadTask(Task *task, const WorkloadManager::RunInfo &runInfo)
void filterTasksConsideringContainers(std::vector< Task * > &tsks)
void runWlm(Scheduler *graph, int debug=0, bool fromScratch=true)
bool _stopOnErrorRequested
int _numberOfRunningTasks
static void * functionForTaskLoad(void *)
void setListOfBreakPoints(std::list< std::string > listOfBreakPoints)
define a list of nodes names as breakpoints in the graph
void traceExec(Task *task, const std::string &message, const std::string &placement)
int getMaxNbOfThreads() const
void beginTask(Task *task)
YACS::BASES::Mutex _mutexForNbOfConcurrentThreads
int getNumberOfRunningTasks()
number of running tasks
YACS::BASES::Condition _condForPilot
std::chrono::steady_clock::time_point _start
YACS::BASES::Condition _condForStepByStep
void waitResume()
in modes Step By step or with BreakPoint, wait until pilot resumes the execution
std::mutex & getLocker() const
virtual std::size_t getNumberOfFreePlace() const =0
virtual void release(const Task *node)=0
virtual int getNumberOfCoresPerWorker() const =0
virtual void allocateFor(const std::vector< const Task * > &nodes)=0
Base class for all nodes.
virtual void setErrorDetails(const std::string &error)
static std::string getStateName(YACS::StatesForNode state)
Return the name of a state.
virtual void applyDPLScope(ComposedNode *gfn)
virtual std::vector< Task * > getNextTasks(bool &isMore)=0
virtual std::string getName() const =0
virtual std::string getTaskName(Task *task) const =0
virtual bool isFinished()=0
virtual void notifyFrom(const Task *sender, YACS::Event event, const Executor *execInst)=0
virtual void init(bool start=true)=0
virtual void selectRunnableTasks(std::vector< Task * > &tasks)=0
virtual void exUpdateState()=0
virtual std::string getProperty(const std::string &name)=0
virtual void initService()=0
virtual void imposeResource(const std::string &resource_name, const std::string &container_name)
virtual void connected()=0
virtual void finished()=0
virtual void getCoupledTasks(std::set< Task * > &coupledSet)=0
virtual Container * getContainer()=0
virtual void disconnectService()=0
virtual void connectService()=0
virtual YACS::StatesForNode getState() const =0
void openFileDump(const std::string &xmlDump)
static void loadResources(WorkloadManager::WorkloadManager &wm)
void YACSLIBENGINE_EXPORT StateLoader(Node *node, YACS::StatesForNode state)
bool operator()(HomogeneousPoolContainer *lhs, HomogeneousPoolContainer *rhs) const