Version: 9.12.0
Executor.cxx
Go to the documentation of this file.
1 // Copyright (C) 2006-2023 CEA, EDF
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19 
20 #include "Executor.hxx"
21 #include "Task.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
27 #include "ComponentInstance.hxx"
28 
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
32 
33 #include "WlmTask.hxx"
34 #include "workloadmanager/WorkloadManager.hxx"
35 #include "workloadmanager/DefaultAlgorithm.hxx"
36 
37 #include <iostream>
38 #include <fstream>
39 #include <sys/stat.h>
40 #ifndef WIN32
41 #include <sys/time.h>
42 #include <unistd.h>
43 #endif
44 
45 #include <cstdlib>
46 #include <algorithm>
47 
48 #ifdef WIN32
49 #define usleep(A) _sleep(A/1000)
50 #if !defined(S_ISCHR) || !defined(S_ISREG)
51 # ifndef S_IFMT
52 # ifdef _S_IFMT
53 # define S_IFMT _S_IFMT
54 # define S_IFCHR _S_IFCHR
55 # define S_IFREG _S_IFREG
56 # else
57 # ifdef __S_IFMT
58 # define S_IFMT __S_IFMT
59 # define S_IFCHR __S_IFCHR
60 # define S_IFREG __S_IFREG
61 # endif
62 # endif
63 # endif
64 # define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
65 # define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
66 #endif
67 #endif
68 
69 using namespace YACS::ENGINE;
70 using namespace std;
71 
72 using YACS::BASES::Mutex;
73 using YACS::BASES::Thread;
74 using YACS::BASES::Semaphore;
75 
76 //#define _DEVDEBUG_
77 #include "YacsTrace.hxx"
78 
79 int Executor::_maxThreads(1000);
80 size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
81 
82 Executor::Executor():_mainSched(NULL),_isWaitingEventsFromRunningTasks(false),_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
83 {
84  _root=0;
85  _toContinue = true;
86  _isOKToEnd = false;
87  _stopOnErrorRequested = false;
88  _dumpOnErrorRequested = false;
89  _errorDetected = false;
96  DEBTRACE("Executor initialized with max threads = " << _maxThreads);
97 }
98 
100 {
101 }
102 
104 
116 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
117 {
118  DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
119  _mainSched=graph;
120  _root = dynamic_cast<ComposedNode *>(_mainSched);
121  if (!_root) throw Exception("Executor::Run, Internal Error!");
122  bool isMore;
123  int i=0;
124  if(debug>1)_displayDot(graph);
125  if (fromScratch)
126  {
127  graph->init();
128  graph->exUpdateState();
129  }
130  if(debug>1)_displayDot(graph);
131  vector<Task *> tasks;
132  vector<Task *>::iterator iter;
133  _toContinue=true;
137  _runningTasks.clear();
139  while(_toContinue)
140  {
142 
143  if(debug>2)_displayDot(graph);
144 
145  {//Critical section
147  tasks=graph->getNextTasks(isMore);
148  graph->selectRunnableTasks(tasks);
149  }//End of critical section
150 
151  if(debug>2)_displayDot(graph);
152 
153  for(iter=tasks.begin();iter!=tasks.end();iter++)
154  loadTask(*iter,this);
155 
156  if(debug>1)_displayDot(graph);
157 
158  launchTasks(tasks);
159 
160  if(debug>1)_displayDot(graph);
161 
162  {//Critical section
164  _toContinue=!graph->isFinished();
165  }//End of critical section
166  DEBTRACE("_toContinue: " << _toContinue);
167 
168  if(debug>0)_displayDot(graph);
169 
170  i++;
171  }
172 }
173 
175 
235 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
236 {
237  DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
238 
239  { // --- Critical section
241  _mainSched = graph;
242  _root = dynamic_cast<ComposedNode *>(_mainSched);
243  if (!_root) throw Exception("Executor::Run, Internal Error!");
245  sendEvent("executor");
246  _toContinue=true;
247  _isOKToEnd = false;
248  _errorDetected = false;
251  _runningTasks.clear();
253  string tracefile = "traceExec_";
254  tracefile += _mainSched->getName();
255  _trace.open(tracefile.c_str());
256  _start = std::chrono::steady_clock::now();
257  } // --- End of critical section
258 
259  if (debug > 1) _displayDot(graph);
260 
261  if (fromScratch)
262  {
263  try
264  {
265  graph->init();
266  graph->exUpdateState();
267  }
268  catch(Exception& ex)
269  {
270  DEBTRACE("exception: "<< (ex.what()));
272  sendEvent("executor");
273  throw;
274  }
275  }
277  sendEvent("executor");
278 
279  if (debug > 1) _displayDot(graph);
280 
281  vector<Task *>::iterator iter;
282  bool isMore;
283  int problemCount=0;
284  int numberAllTasks;
285 
287  sendEvent("executor");
288  while (_toContinue)
289  {
290  DEBTRACE("--- executor main loop");
292  DEBTRACE("--- events...");
293  if (debug > 2) _displayDot(graph);
294  { // --- Critical section
296  std::vector<Task *> tasks = graph->getNextTasks(isMore);
297  graph->selectRunnableTasks(tasks);
299  _tasks = tasks;
300  numberAllTasks=_numberOfRunningTasks+_tasks.size();
301  } // --- End of critical section
302  if (debug > 2) _displayDot(graph);
304  {
305  if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
306  if (debug > 0) _displayDot(graph);
307  DEBTRACE("---");
309  if (debug > 1) _displayDot(graph);
310  DEBTRACE("---");
312  DEBTRACE("---");
313  }
314  if (debug > 1) _displayDot(graph);
315  { // --- Critical section
316  DEBTRACE("---");
318  //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
319  if(_numberOfRunningTasks == 0)
320  _toContinue = !graph->isFinished();
321 
322  DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
323  DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
324  DEBTRACE("_toContinue: " << _toContinue);
325  if(_toContinue && numberAllTasks==0)
326  {
327  //Problem : no running tasks and no task to launch ??
328  problemCount++;
329  std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
330  //Pause to give a chance to interrupt
331  usleep(1000);
332  if(problemCount > 25)
333  {
334  // Too much problems encountered : stop execution
335  _toContinue=false;
336  }
337  }
338 
339  if (! _toContinue)
340  {
342  sendEvent("executor");
343  _condForPilot.notify_all();
344  }
345  } // --- End of critical section
346  if (debug > 0) _displayDot(graph);
347  DEBTRACE("_toContinue: " << _toContinue);
348  }
349 
350  DEBTRACE("End of main Loop");
351 
352  { // --- Critical section
354  if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
355  {
356  DEBTRACE("stop requested: End soon");
358  _toContinue = false;
359  sendEvent("executor");
360  }
361  } // --- End of critical section
363  {
365  }
366  {
368  _trace.close();
369  }
370  DEBTRACE("End of RunB thread");
371 }
372 
374 {
376  return _execMode;
377 }
378 
379 
381 {
383  return _executorState;
384 }
385 
386 
388 {
390  return _toContinue;
391 }
392 
394 
399 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
400 {
401  { // --- Critical section
403  _dumpErrorFile=xmlFile;
405  _dumpOnErrorRequested = dumpRequested;
406  if (dumpRequested && xmlFile.empty())
407  throw YACS::Exception("dump on error requested and no filename given for dump");
408  DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
409  } // --- End of critical section
410 }
411 
413 
417 {
418  { // --- Critical section
420  _stopOnErrorRequested=false;
421  } // --- End of critical section
422 }
423 
425 
431 {
432  DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
433  { // --- Critical section
436  _execMode = mode;
437  } // --- End of critical section
438 }
439 
441 
449 {
450  DEBTRACE("Executor::resumeCurrentBreakPoint()");
451  bool ret = false;
452  //bool doDump = false;
453  { // --- Critical section
456  DEBTRACE("_executorState: " << _executorState);
457  switch (_executorState)
458  {
459  case YACS::WAITINGTASKS:
460  case YACS::PAUSED:
461  {
462  _condForStepByStep.notify_all();
464  sendEvent("executor");
465  ret = true;
466  //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
467  break;
468  }
469  case YACS::FINISHED:
470  case YACS::STOPPED:
471  {
472  //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
473  DEBTRACE("Graph Execution finished or stopped !");
474  break;
475  }
476  default :
477  {
478  // debug: no easy way to verify if main loop is acutally waiting on condition
479  }
480  }
481  DEBTRACE("---");
482  //if (doDump) saveState(_dumpErrorFile);
483  } // --- End of critical section
484  return ret;
485 }
486 
487 
489 
490 
491 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
492 {
493  DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
494  { // --- Critical section
497  _listOfBreakPoints = listOfBreakPoints;
498  } // --- End of critical section
499 }
500 
501 
503 
507 std::list<std::string> Executor::getTasksToLoad()
508 {
509  DEBTRACE("Executor::getTasksToLoad()");
510  list<string> listOfNodesToLoad;
511  listOfNodesToLoad.clear();
512  { // --- Critical section
515  switch (_executorState)
516  {
517  case YACS::WAITINGTASKS:
518  case YACS::PAUSED:
519  {
520  listOfNodesToLoad = _listOfTasksToLoad;
521  break;
522  }
524  case YACS::INITIALISED:
525  case YACS::RUNNING:
526  case YACS::FINISHED:
527  case YACS::STOPPED:
528  default:
529  {
530  break;
531  }
532  }
533  } // --- End of critical section
534  return listOfNodesToLoad;
535 }
536 
537 
539 
545 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
546 {
547  DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
548  bool ret = true;
549  vector<Task *>::iterator iter;
550  vector<Task *> restrictedTasks;
551  { // --- Critical section
554  switch (_executorState)
555  {
556  case YACS::WAITINGTASKS:
557  case YACS::PAUSED:
558  {
559  for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
560  {
561  string readyNode = _mainSched->getTaskName(*iter);
562  if (find(listToExecute.begin(), listToExecute.end(), readyNode)
563  != listToExecute.end())
564  {
565  restrictedTasks.push_back(*iter);
566  DEBTRACE("node to execute " << readyNode);
567  }
568  }
569  _tasks.clear();
570  for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
571  {
572  _tasks.push_back(*iter);
573  }
574  break;
575  }
577  case YACS::INITIALISED:
578  case YACS::RUNNING:
579  case YACS::FINISHED:
580  case YACS::STOPPED:
581  default:
582  {
583  break;
584  }
585  }
586  } // --- End of critical section
587 
588  _tasks.clear();
589  for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
590  {
591  _tasks.push_back(*iter);
592  }
593  for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
594  {
595  string readyNode = _mainSched->getTaskName(*iter);
596  DEBTRACE("selected node to execute " << readyNode);
597  }
598 
599  return ret;
600 }
601 
603 
609 {
610  DEBTRACE("Executor::waitPause()" << _executorState);
611  { // --- Critical section
614  switch (_executorState)
615  {
616  default:
617  case YACS::STOPPED:
618  case YACS::FINISHED:
619  case YACS::WAITINGTASKS:
620  case YACS::PAUSED:
621  {
622  break;
623  }
625  case YACS::INITIALISED:
626  case YACS::RUNNING:
627  {
628  _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
629  break;
630  }
631  }
632  } // --- End of critical section
633  DEBTRACE("---");
634 }
635 
644 {
645  // no AutoLocker here. It's not a bug.
648  {// execution is finished
649  _mutexForSchedulerUpdate.unLock();
650  return false;// the executor is no more running
651  }
652  //general case. Leave method with locker in locked status
653  return true;
654 }
655 
660 void Executor::resume(bool suspended)
661 {
662  if(suspended)
663  _mutexForSchedulerUpdate.unLock();
664 }
665 
667 
669 {
671  //waitPause();
672  _isOKToEnd = true;
674 }
675 
677 
678 bool Executor::saveState(const std::string& xmlFile)
679 {
680  DEBTRACE("Executor::saveState() in " << xmlFile);
681  bool result = false;
682  try {
685  vst.openFileDump(xmlFile.c_str());
686  _root->accept(&vst);
687  vst.closeFileDump();
688  result = true;
689  }
690  catch(Exception& ex) {
691  std::cerr << ex.what() << std::endl;
692  }
693  return result;
694 }
695 
697 
699 {
700  DEBTRACE("Executor::loadState()");
702  return true;
703 }
704 
705 
706 static int isfile(const char *filename)
707 {
708  struct stat buf;
709  if (stat(filename, &buf) != 0)
710  return 0;
711  if (!S_ISREG(buf.st_mode))
712  return 0;
713  return 1;
714 }
715 
717 
719 {
721  _displayDot(graph);
722 }
723 
725 
730 {
731  std::ofstream g("titi");
732  ((ComposedNode*)graph)->writeDot(g);
733  g.close();
734  const char displayScript[]="display.sh";
735  if(isfile(displayScript))
736  system("sh display.sh");
737  else
738  system("dot -Tpng titi|display -delay 5");
739 }
740 
742 
753 {
754  DEBTRACE("Executor::checkBreakPoints()");
755  vector<Task *>::iterator iter;
756  bool endRequested = false;
757 
758  switch (_execMode)
759  {
760  case YACS::CONTINUE:
761  {
762  break;
763  }
765  {
766  bool stop = false;
767  { // --- Critical section
769  _tasksSave = _tasks;
770  for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
771  {
772  string nodeToLoad = _mainSched->getTaskName(*iter);
773  if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
774  != _listOfBreakPoints.end())
775  {
776  stop = true;
777  break;
778  }
779  }
780  if (stop)
781  {
782  _listOfTasksToLoad.clear();
783  for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
784  {
785  string nodeToLoad = _mainSched->getTaskName(*iter);
786  _listOfTasksToLoad.push_back(nodeToLoad);
787  }
788  if (getNbOfThreads())
789  _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
790  else
792  sendEvent("executor");
793  _condForPilot.notify_all();
794  }
795  if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
796  if (_isOKToEnd) endRequested = true;
797  } // --- End of critical section
798  if (stop) DEBTRACE("wake up from waitResume");
799  break;
800  }
801  default:
802  case YACS::STEPBYSTEP:
803  {
804  { // --- Critical section
806  _tasksSave = _tasks;
807  _listOfTasksToLoad.clear();
808  for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
809  {
810  string nodeToLoad = _mainSched->getTaskName(*iter);
811  _listOfTasksToLoad.push_back(nodeToLoad);
812  }
813  if (getNbOfThreads())
814  _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
815  else
817  sendEvent("executor");
818  _condForPilot.notify_all();
819  if (!_isOKToEnd)
820  waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
821  // or, if no pilot, wait until no more running tasks (stop on error)
822  if (_isOKToEnd) endRequested = true;
823  } // --- End of critical section
824  DEBTRACE("wake up from waitResume");
825  break;
826  }
827  }
828  DEBTRACE("endRequested: " << endRequested);
829  return endRequested;
830 }
831 
832 
834 
841 {
842  DEBTRACE("Executor::waitResume()");
843  _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
844  DEBTRACE("---");
845 }
846 
847 
849 
853 void Executor::loadTask(Task *task, const Executor *execInst)
854 {
855  DEBTRACE("Executor::loadTask(Task *task)");
856  if(task->getState() != YACS::TOLOAD)
857  return;
858  traceExec(task, "state:TOLOAD", ComputePlacement(task));
859  {//Critical section
861  _mainSched->notifyFrom(task,YACS::START,execInst);
862  }//End of critical section
863  try
864  {
865  traceExec(task, "load", ComputePlacement(task));
866  task->load();
867  traceExec(task, "initService", ComputePlacement(task));
868  task->initService();
869  }
870  catch(Exception& ex)
871  {
872  std::cerr << ex.what() << std::endl;
873  {//Critical section
875  task->aborted();
876  _mainSched->notifyFrom(task,YACS::ABORT,execInst);
877  traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
878  }//End of critical section
879  }
880  catch(...)
881  {
882  std::cerr << "Load failed" << std::endl;
883  {//Critical section
885  task->aborted();
886  _mainSched->notifyFrom(task,YACS::ABORT,execInst);
887  traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
888  }//End of critical section
889  }
890 }
891 
893 {
897 };
898 
899 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
900 {
901  std::vector<Thread> ths(tasks.size());
902  std::size_t ithread(0);
903  for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
904  {
905  DEBTRACE("Executor::loadParallelTasks(Task *task)");
906  struct threadargs *args(new threadargs);
907  args->task = (*iter);
908  args->sched = _mainSched;
909  args->execInst = this;
910  ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
911  }
912  for(ithread=0;ithread<tasks.size();ithread++)
913  ths[ithread].join();
914 }
915 
917 
921 void Executor::launchTasks(const std::vector<Task *>& tasks)
922 {
923  //First phase, make datastream connections
924  for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
925  {
926  YACS::StatesForNode state=(*iter)->getState();
927  if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
928  try
929  {
930  (*iter)->connectService();
931  traceExec(*iter, "connectService",ComputePlacement(*iter));
932  {//Critical section
934  (*iter)->connected();
935  }//End of critical section
936  }
937  catch(Exception& ex)
938  {
939  std::cerr << ex.what() << std::endl;
940  try
941  {
942  (*iter)->disconnectService();
943  traceExec(*iter, "disconnectService",ComputePlacement(*iter));
944  }
945  catch(...)
946  {
947  // Disconnect has failed
948  traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
949  }
950  {//Critical section
952  (*iter)->aborted();
953  _mainSched->notifyFrom(*iter,YACS::ABORT,this);
954  }//End of critical section
955  }
956  catch(...)
957  {
958  std::cerr << "Problem in connectService" << std::endl;
959  try
960  {
961  (*iter)->disconnectService();
962  traceExec(*iter, "disconnectService",ComputePlacement(*iter));
963  }
964  catch(...)
965  {
966  // Disconnect has failed
967  traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
968  }
969  {//Critical section
971  (*iter)->aborted();
972  _mainSched->notifyFrom(*iter,YACS::ABORT,this);
973  }//End of critical section
974  }
975  if((*iter)->getState() == YACS::ERROR)
976  {
977  //try to put all coupled tasks in error
978  std::set<Task*> coupledSet;
979  (*iter)->getCoupledTasks(coupledSet);
980  for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
981  {
982  Task* t=*it;
983  if(t == *iter)continue;
984  if(t->getState() == YACS::ERROR)continue;
985  try
986  {
987  t->disconnectService();
988  traceExec(t, "disconnectService",ComputePlacement(*iter));
989  }
990  catch(...)
991  {
992  // Disconnect has failed
993  traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
994  }
995  {//Critical section
997  t->aborted();
999  }//End of critical section
1000  traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
1001  }
1002  }
1003  traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
1004  }
1005 
1006  //Second phase, execute each task in a thread
1007  for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
1008  {
1009  launchTask(*iter);
1010  }
1011 }
1012 
1014 
1023 {
1024  DEBTRACE("Executor::launchTask(Task *task)");
1025  struct threadargs *args;
1026  if(task->getState() != YACS::TOACTIVATE)return;
1027 
1028  DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
1029  if(_semThreadCnt == 0)
1030  {
1031  // --- Critical section
1033  //check if we have enough threads to run
1034  std::set<Task*> tmpSet=_runningTasks;
1035  std::set<Task*>::iterator it = tmpSet.begin();
1036  std::string status="running";
1037  std::set<Task*> coupledSet;
1038  while( it != tmpSet.end() )
1039  {
1040  Task* tt=*it;
1041  coupledSet.clear();
1042  tt->getCoupledTasks(coupledSet);
1043  status="running";
1044  for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1045  {
1046  if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1047  tmpSet.erase(*iter);
1048  }
1049  if(status=="running")break;
1050  it = tmpSet.begin();
1051  }
1052 
1053  if(status=="toactivate")
1054  {
1055  std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1056  std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1057  }
1058  // --- End of critical section
1059  }
1060 
1061  _semForMaxThreads.wait();
1062  _semThreadCnt -= 1;
1063 
1064  args= new threadargs;
1065  args->task = task;
1066  args->sched = _mainSched;
1067  args->execInst = this;
1068 
1069  traceExec(task, "launch",ComputePlacement(task));
1070 
1071  { // --- Critical section
1074  _runningTasks.insert(task);
1075  task->begin(); //change state to ACTIVATED
1076  } // --- End of critical section
1078 }
1079 
1081 
1083 {
1084  DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1085 // _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1088  {
1090  _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1091  }
1093  DEBTRACE("---");
1094 }
1095 
1096 
1098 
1100 {
1101  DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1103  {
1105  _condForNewTasksToPerform.notify_all();
1106  }
1107  else
1109 }
1110 
1112 {
1113  return (int)_maxNbThreads;
1114 }
1115 
1116 void Executor::setMaxNbOfThreads(int maxNbThreads)
1117 {
1118  _maxNbThreads = static_cast< std::uint32_t >(maxNbThreads);
1119 }
1120 
1122 {
1123  int ret = 0;
1126  return ret;
1127 }
1128 
1131 {
1134  return _numberOfRunningTasks;
1135 }
1136 
1141 {
1142  DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1143  struct threadargs *args = (struct threadargs *) arg;
1144  Task *task=args->task;
1145  Scheduler *sched=args->sched;
1146  Executor *execInst=args->execInst;
1147  delete args;
1148  execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1149  return 0;
1150 }
1151 
1153 
1166 {
1167  DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1168 
1169  struct threadargs *args = (struct threadargs *) arg;
1170  Task *task=args->task;
1171  Scheduler *sched=args->sched;
1172  Executor *execInst=args->execInst;
1173  delete args;
1175 
1176  Thread::detach();
1177 
1178  // Execute task
1179 
1181  {
1182  Node *node(dynamic_cast<Node *>(task));
1183  ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1184  if(node!=0 && gfn!=0)
1185  node->applyDPLScope(gfn);
1186  }
1187 
1189  try
1190  {
1191  execInst->traceExec(task, "start execution",ComputePlacement(task));
1192  task->execute();
1193  execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1194  }
1195  catch(Exception& ex)
1196  {
1197  std::cerr << "YACS Exception during execute" << std::endl;
1198  std::cerr << ex.what() << std::endl;
1199  ev=YACS::ABORT;
1200  string message = "end execution ABORT, ";
1201  message += ex.what();
1203  }
1204  catch(...)
1205  {
1206  // Execution has failed
1207  std::cerr << "Execution has failed: unknown reason" << std::endl;
1208  ev=YACS::ABORT;
1209  execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1210  }
1211 
1212  // Disconnect task
1213  try
1214  {
1215  DEBTRACE("task->disconnectService()");
1217  execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1218  }
1219  catch(...)
1220  {
1221  // Disconnect has failed
1222  std::cerr << "disconnect has failed" << std::endl;
1223  ev=YACS::ABORT;
1224  execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1225  }
1226  //
1227 
1228  std::string placement(ComputePlacement(task));
1229 
1230  // container management for HomogeneousPoolOfContainer
1231 
1233  if(contC)
1234  {
1235  std::lock_guard<std::mutex> alckCont(contC->getLocker());
1236  contC->release(task);
1237  }
1238 
1239  DEBTRACE("End task->execute()");
1240  { // --- Critical section
1242  try
1243  {
1244  if (ev == YACS::FINISH) task->finished();
1245  if (ev == YACS::ABORT)
1246  {
1247  execInst->_errorDetected = true;
1249  {
1251  execInst->_isOKToEnd = true;
1252  }
1253  task->aborted();
1254  }
1255  execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1257  }
1258  catch(Exception& ex)
1259  {
1260  //notify has failed : it is supposed to have set state
1261  //so no need to do anything
1262  std::cerr << "Error during notification" << std::endl;
1263  std::cerr << ex.what() << std::endl;
1264  }
1265  catch(...)
1266  {
1267  //notify has failed : it is supposed to have set state
1268  //so no need to do anything
1269  std::cerr << "Notification failed" << std::endl;
1270  }
1272  execInst->_runningTasks.erase(task);
1273  DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks
1274  << " _execMode: " << execInst->_execMode
1275  << " _executorState: " << execInst->_executorState);
1276  if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1277  {
1279  {
1281  execInst->sendEvent("executor");
1282  execInst->_condForPilot.notify_all();
1283  if (execInst->_errorDetected &&
1286  execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1287  }
1288  }
1289  DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1290  execInst->_semForMaxThreads.post();
1291  execInst->_semThreadCnt += 1;
1292  DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1294 
1295  } // --- End of critical section (change state)
1296 
1297  Thread::exit(0);
1298  return 0;
1299 }
1300 
1301 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1302 {
1303  string nodeName = _mainSched->getTaskName(task);
1304  Container *cont = task->getContainer();
1305  string containerName = "---";
1306  if (cont)
1307  containerName = cont->getName();
1308 
1309  std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
1310  std::chrono::milliseconds millisec;
1311  millisec = std::chrono::duration_cast<std::chrono::milliseconds>(now -_start);
1312  double elapse = double(millisec.count()) / 1000.0;
1313  {
1315  _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1316  _trace << flush;
1317  }
1318 }
1319 
1321 
1324 void Executor::sendEvent(const std::string& event)
1325 {
1327  YASSERT(disp);
1328  YASSERT(_root);
1329  disp->dispatch(_root,event);
1330 }
1331 
1333 {
1335  {
1336  if(!lhs && !rhs)
1337  return false;
1338  if(!lhs)
1339  return true;
1340  if(!rhs)
1341  return false;
1343  }
1344 };
1345 
1352 void Executor::filterTasksConsideringContainers(std::vector<Task *>& tsks)
1353 {
1354  std::map<HomogeneousPoolContainer *, std::vector<Task *>, HPCCompare > m;
1355  for(auto cur : tsks)
1356  {
1357  if(!cur)
1358  continue;
1359  Container *cont(cur->getContainer());
1360  if(!cont)
1361  {
1362  m[nullptr].push_back(cur);
1363  continue;
1364  }
1365  HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1366  if(!contC)
1367  {
1368  m[nullptr].push_back(cur);
1369  continue;
1370  }
1371  m[contC].push_back(cur);
1372  }
1373  //
1374  std::vector<Task *> ret;
1375  for(auto it : m)
1376  {
1377  HomogeneousPoolContainer *curhpc(it.first);
1378  const std::vector<Task *>& curtsks(it.second);
1379  if(!curhpc)
1380  {
1381  std::uint32_t nbThreadsRunning = _runningTasks.size();
1382  std::uint32_t nbOfFreeSpace = _maxNbThreads - min(_maxNbThreads,nbThreadsRunning);
1383  std::uint32_t nbOfCandidates = static_cast<std::uint32_t>( curtsks.size() );
1384  std::uint32_t nbOfCandidatesToBeLaunched = std::min(nbOfCandidates,nbOfFreeSpace);
1385  DEBTRACE("nb threads running: " << nbThreadsRunning);
1386  DEBTRACE("MaxNbThreads: " << _maxNbThreads);
1387  DEBTRACE("nbOfFreeSpace: " << nbOfFreeSpace);
1388  DEBTRACE("nbOfCandidates: " << nbOfCandidates);
1389  DEBTRACE("nbOfCandidatesToBeLaunched: " << nbOfCandidatesToBeLaunched);
1390  ret.insert(ret.end(),curtsks.begin(),curtsks.begin() + nbOfCandidatesToBeLaunched);
1391  }
1392  else
1393  {
1394  // start of critical section for container curhpc
1395  std::lock_guard<std::mutex> alckCont(curhpc->getLocker());
1396  std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1397  std::size_t sz(curhpc->getNumberOfFreePlace());
1398  std::vector<Task *>::const_iterator it2(curtsks.begin());
1399  for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1400  {
1401  vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1402  ret.push_back(*it2);
1403  }
1404  curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1405  //end of critical section
1406  }
1407  }
1408  //
1409  tsks=ret;
1410 }
1411 
1412 std::string Executor::ComputePlacement(Task *zeTask)
1413 {
1414  std::string placement("---");
1415  if(!zeTask)
1416  return placement;
1417  if(zeTask->getContainer())
1418  placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1419  return placement;
1420 }
1421 
1423 void Executor::loadTask(Task *task, const WorkloadManager::RunInfo& runInfo)
1424 {
1425  if(task->getState() != YACS::TOLOAD)
1426  return;
1427  traceExec(task, "state:TOLOAD", ComputePlacement(task));
1428  {//Critical section
1430  _mainSched->notifyFrom(task,YACS::START,this);
1431  }//End of critical section
1432  try
1433  {
1434  std::ostringstream container_name;
1435  container_name << runInfo.type.name << "-" << runInfo.index;
1436  task->imposeResource(runInfo.resource.name, container_name.str());
1437  traceExec(task, "load", ComputePlacement(task));
1438  task->load();
1439  traceExec(task, "initService", ComputePlacement(task));
1440  task->initService();
1441  }
1442  catch(Exception& ex)
1443  {
1444  std::cerr << ex.what() << std::endl;
1445  {//Critical section
1447  task->aborted();
1448  _mainSched->notifyFrom(task,YACS::ABORT, this);
1449  traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1450  }//End of critical section
1451  }
1452  catch(...)
1453  {
1454  std::cerr << "Load failed" << std::endl;
1455  {//Critical section
1457  task->aborted();
1458  _mainSched->notifyFrom(task,YACS::ABORT, this);
1459  traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1460  }//End of critical section
1461  }
1462 }
1463 
1465 {
1466  // --- Critical section
1469  _runningTasks.insert(task);
1470  // --- End of critical section
1471 }
1472 
1474 {
1476  try
1477  {
1478  if (ev == YACS::FINISH) task->finished();
1479  if (ev == YACS::ABORT)
1480  {
1481  _errorDetected = true;
1483  {
1485  _isOKToEnd = true;
1486  }
1487  task->aborted();
1488  }
1489  //traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1490  _mainSched->notifyFrom(task,ev,this);
1491  }
1492  catch(Exception& ex)
1493  {
1494  //notify has failed : it is supposed to have set state
1495  //so no need to do anything
1496  std::cerr << "Error during notification" << std::endl;
1497  std::cerr << ex.what() << std::endl;
1498  }
1499  catch(...)
1500  {
1501  //notify has failed : it is supposed to have set state
1502  //so no need to do anything
1503  std::cerr << "Notification failed" << std::endl;
1504  }
1506  _runningTasks.erase(task);
1507  if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks
1508  {
1510  {
1512  sendEvent("executor");
1513  _condForPilot.notify_all();
1514  if (_errorDetected &&
1517  _condForStepByStep.notify_all(); // exec thread may be on waitResume
1518  }
1519  }
1521  wakeUp();
1522 }
1523 
1524 void Executor::failTask(Task *task, const std::string& message)
1525 {
1526  ElementaryNode* elemNode = dynamic_cast<ElementaryNode*>(task);
1527  if(elemNode != nullptr)
1528  {
1529  StateLoader(elemNode, YACS::ERROR);
1530  elemNode->setErrorDetails(message);
1531  }
1532  endTask(task, YACS::ABORT);
1533 }
1534 
1536 {
1537  { // --- Critical section
1539  task->begin(); //change state to ACTIVATED
1540  }
1541  traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1542 
1543  if(getDPLScopeSensitive())
1544  {
1545  Node *node(dynamic_cast<Node *>(task));
1546  ComposedNode *gfn(dynamic_cast<ComposedNode *>(_mainSched));
1547  if(node!=0 && gfn!=0)
1548  node->applyDPLScope(gfn);
1549  }
1550 
1552  try
1553  {
1554  traceExec(task, "start execution",ComputePlacement(task));
1555  task->execute();
1556  traceExec(task, "end execution OK",ComputePlacement(task));
1557  }
1558  catch(Exception& ex)
1559  {
1560  std::cerr << "YACS Exception during execute" << std::endl;
1561  std::cerr << ex.what() << std::endl;
1562  ev=YACS::ABORT;
1563  string message = "end execution ABORT, ";
1564  message += ex.what();
1565  traceExec(task, message,ComputePlacement(task));
1566  }
1567  catch(...)
1568  {
1569  // Execution has failed
1570  std::cerr << "Execution has failed: unknown reason" << std::endl;
1571  ev=YACS::ABORT;
1572  traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1573  }
1574 
1575  // Disconnect task
1576  try
1577  {
1578  DEBTRACE("task->disconnectService()");
1579  task->disconnectService();
1580  traceExec(task, "disconnectService",ComputePlacement(task));
1581  }
1582  catch(...)
1583  {
1584  // Disconnect has failed
1585  std::cerr << "disconnect has failed" << std::endl;
1586  ev=YACS::ABORT;
1587  traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1588  }
1589  //
1590 
1591  std::string placement(ComputePlacement(task));
1592 
1593  // container management for HomogeneousPoolOfContainer
1594 
1595  HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1596  if(contC)
1597  {
1598  std::lock_guard<std::mutex> alckCont(contC->getLocker());
1599  contC->release(task);
1600  }
1601 
1602  return ev;
1603 }
1604 
1606 {
1609  return;
1610  try
1611  {
1612  task->connectService();
1613  traceExec(task, "connectService",ComputePlacement(task));
1614  {//Critical section
1616  task->connected();
1617  }//End of critical section
1618  }
1619  catch(Exception& ex)
1620  {
1621  std::cerr << ex.what() << std::endl;
1622  try
1623  {
1624  (task)->disconnectService();
1625  traceExec(task, "disconnectService",ComputePlacement(task));
1626  }
1627  catch(...)
1628  {
1629  // Disconnect has failed
1630  traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1631  }
1632  {//Critical section
1634  task->aborted();
1635  _mainSched->notifyFrom(task,YACS::ABORT,this);
1636  }//End of critical section
1637  }
1638  catch(...)
1639  {
1640  std::cerr << "Problem in connectService" << std::endl;
1641  try
1642  {
1643  (task)->disconnectService();
1644  traceExec(task, "disconnectService",ComputePlacement(task));
1645  }
1646  catch(...)
1647  {
1648  // Disconnect has failed
1649  traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1650  }
1651  {//Critical section
1653  task->aborted();
1654  _mainSched->notifyFrom(task,YACS::ABORT,this);
1655  }//End of critical section
1656  }
1657  if(task->getState() == YACS::ERROR)
1658  {
1659  //try to put all coupled tasks in error
1660  std::set<Task*> coupledSet;
1661  task->getCoupledTasks(coupledSet);
1662  for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
1663  {
1664  Task* t=*it;
1665  if(t == task)continue;
1666  if(t->getState() == YACS::ERROR)continue;
1667  try
1668  {
1669  t->disconnectService();
1670  traceExec(t, "disconnectService",ComputePlacement(task));
1671  }
1672  catch(...)
1673  {
1674  // Disconnect has failed
1675  traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task));
1676  }
1677  {//Critical section
1679  t->aborted();
1681  }//End of critical section
1682  traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task));
1683  }
1684  }
1685  traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1686 }
1687 
1688 void Executor::runWlm(Scheduler *graph,int debug, bool fromScratch)
1689 {
1690  DEBTRACE("Executor::runWlm debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
1691  { // --- Critical section
1693  _mainSched = graph;
1694  _root = dynamic_cast<ComposedNode *>(_mainSched);
1695  if (!_root) throw Exception("Executor::Run, Internal Error!");
1697  sendEvent("executor");
1698  _toContinue=true;
1699  _isOKToEnd = false;
1700  _errorDetected = false;
1703  _runningTasks.clear();
1704  _numberOfEndedTasks = 0;
1705  string tracefile = "traceExec_";
1706  tracefile += _mainSched->getName();
1707  _trace.open(tracefile.c_str());
1708  _start = std::chrono::steady_clock::now();
1709  } // --- End of critical section
1710 
1711  if (debug > 1) _displayDot(graph);
1712 
1713  if (fromScratch)
1714  {
1715  try
1716  {
1717  graph->init();
1718  graph->exUpdateState();
1719  }
1720  catch(Exception& ex)
1721  {
1722  DEBTRACE("exception: "<< (ex.what()));
1724  sendEvent("executor");
1725  throw;
1726  }
1727  }
1729  sendEvent("executor");
1730 
1731  if (debug > 1) _displayDot(graph);
1732 
1733  bool isMore;
1734  int problemCount=0;
1735  int numberAllTasks;
1736 
1738  sendEvent("executor");
1739 
1740  WorkloadManager::DefaultAlgorithm algo;
1741  WorkloadManager::WorkloadManager wlm(algo);
1743  wlm.start();
1744 
1745  while (_toContinue)
1746  {
1747  DEBTRACE("--- executor main loop");
1749  DEBTRACE("--- events...");
1750  if (debug > 2) _displayDot(graph);
1751  { // --- Critical section
1753  std::vector<Task *> readyTasks=graph->getNextTasks(isMore);
1754  graph->selectRunnableTasks(readyTasks);
1755  _tasks.clear();
1756  for(Task * t : readyTasks)
1757  if(_runningTasks.find(t) == _runningTasks.end())
1758  _tasks.push_back(t);
1759  // TODO: to be removed
1761  numberAllTasks=_numberOfRunningTasks+_tasks.size();
1762  } // --- End of critical section
1763  if (debug > 2) _displayDot(graph);
1764  DEBTRACE("--- events...");
1766  {
1767  if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
1768  for(Task * task : _tasks)
1769  {
1770  beginTask(task);
1771  WlmTask* newTask = new WlmTask(*this, task);
1772  wlm.addTask(newTask);
1773  }
1774  }
1775  if (debug > 1) _displayDot(graph);
1776  { // --- Critical section
1777  DEBTRACE("---");
1779  //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
1780  _toContinue = !graph->isFinished();
1781 
1782  DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
1783  DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
1784  DEBTRACE("_toContinue: " << _toContinue);
1785  if(_toContinue && numberAllTasks==0)
1786  {
1787  //Problem : no running tasks and no task to launch ??
1788  problemCount++;
1789  std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
1790  //Pause to give a chance to interrupt
1791  usleep(1000);
1792  if(problemCount > 25)
1793  {
1794  // Too much problems encountered : stop execution
1795  _toContinue=false;
1796  }
1797  }
1798 
1799  if (! _toContinue)
1800  {
1802  sendEvent("executor");
1803  _condForPilot.notify_all();
1804  }
1805  } // --- End of critical section
1806  if (debug > 0) _displayDot(graph);
1807  DEBTRACE("_toContinue: " << _toContinue);
1808  }
1809 
1810  wlm.stop();
1811  DEBTRACE("End of main Loop");
1812 
1813  { // --- Critical section
1815  if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
1816  {
1817  DEBTRACE("stop requested: End soon");
1819  _toContinue = false;
1820  sendEvent("executor");
1821  }
1822  } // --- End of critical section
1824  {
1826  }
1827  {
1829  _trace.close();
1830  }
1831 }
1832 
1833 void Executor::RunW(Scheduler *graph,int debug, bool fromScratch)
1834 {
1835  std::string str_value = graph->getProperty("executor");
1836  if(str_value == "WorkloadManager"
1837  || str_value == "WORKLOADMANAGER"
1838  || str_value == "workloadmanager"
1839  || str_value == "WorkLoadManager")
1840  runWlm(graph, debug, fromScratch);
1841  else
1842  RunB(graph, debug, fromScratch);
1843 }
static int isfile(const char *filename)
Definition: Executor.cxx:706
#define YASSERT(val)
YASSERT macro is always defined, used like assert, but throw a YACS::Exception instead of abort.
Definition: YacsTrace.hxx:44
#define DEBTRACE(msg)
Definition: YacsTrace.hxx:32
Base class for all composed nodes.
void accept(Visitor *visitor)
virtual std::string getFullPlacementId(const Task *askingNode) const =0
virtual std::string getName() const
Definition: Container.hxx:81
Base class for dispatcher in observer pattern.
Definition: Dispatcher.hxx:74
virtual void dispatch(Node *object, const std::string &event)
Definition: Dispatcher.cxx:85
static Dispatcher * getDispatcher()
Definition: Dispatcher.cxx:55
Base class for all calculation nodes.
Threaded Executor.
Definition: Executor.hxx:63
void setMaxNbOfThreads(int maxNbThreads)
Definition: Executor.cxx:1116
std::uint32_t _maxNbThreads
Definition: Executor.hxx:75
void displayDot(Scheduler *graph)
Display the graph state as a dot display, public method.
Definition: Executor.cxx:718
std::list< std::string > _listOfTasksToLoad
Definition: Executor.hxx:90
virtual void sendEvent(const std::string &event)
emit notification to all observers registered with the dispatcher
Definition: Executor.cxx:1324
YACS::ExecutionMode getCurrentExecMode()
Definition: Executor.cxx:373
YACS::BASES::Mutex _mutexForTrace
Definition: Executor.hxx:74
std::vector< Task * > _tasks
Definition: Executor.hxx:91
bool setStepsToExecute(std::list< std::string > listToExecute)
Define a subset of task to execute in step by step mode.
Definition: Executor.cxx:545
std::list< std::string > getTasksToLoad()
Get the list of tasks to load, to define a subset to execute in step by step mode.
Definition: Executor.cxx:507
bool _isWaitingEventsFromRunningTasks
Definition: Executor.hxx:82
bool _isRunningunderExternalControl
Definition: Executor.hxx:81
void failTask(Task *task, const std::string &message)
Definition: Executor.cxx:1524
bool saveState(const std::string &xmlFile)
save the current state of execution in an xml file
Definition: Executor.cxx:678
void makeDatastreamConnections(Task *task)
Definition: Executor.cxx:1605
YACS::ExecutionMode _execMode
Definition: Executor.hxx:88
std::set< Task * > _runningTasks
Definition: Executor.hxx:84
bool loadState()
not yet implemented
Definition: Executor.cxx:698
void launchTask(Task *task)
Execute a Task in a thread.
Definition: Executor.cxx:1022
static size_t _threadStackSize
Definition: Executor.hxx:132
void resume(bool suspended)
Definition: Executor.cxx:660
void launchTasks(const std::vector< Task * > &tasks)
Execute a list of tasks possibly connected through datastream links.
Definition: Executor.cxx:921
void waitPause()
suspend pilot execution until Executor is in pause or waiting tasks completion mode.
Definition: Executor.cxx:608
void setExecMode(YACS::ExecutionMode mode)
Dynamically set the current mode of execution.
Definition: Executor.cxx:430
static void * functionForTaskExecution(void *)
Function to perform execution of a task in a thread.
Definition: Executor.cxx:1165
void loadParallelTasks(const std::vector< Task * > &tasks, const Executor *execInst)
Definition: Executor.cxx:899
bool checkBreakPoints()
Wait reactivation in modes Step By step or with BreakPoints.
Definition: Executor.cxx:752
YACS::BASES::Mutex _mutexForSchedulerUpdate
Definition: Executor.hxx:73
YACS::Event runTask(Task *task)
Definition: Executor.cxx:1535
bool resumeCurrentBreakPoint()
wake up executor when in pause
Definition: Executor.cxx:448
void sleepWhileNoEventsFromAnyRunningTask()
wait until a running task ends
Definition: Executor.cxx:1082
void _displayDot(Scheduler *graph)
Display the graph state as a dot display.
Definition: Executor.cxx:729
void RunA(Scheduler *graph, int debug=0, bool fromScratch=true)
Execute a graph waiting for completion.
Definition: Executor.cxx:116
void stopExecution()
stops the execution as soon as possible
Definition: Executor.cxx:668
std::string _dumpErrorFile
Definition: Executor.hxx:94
void setStopOnError(bool dumpRequested=false, std::string xmlFile="")
ask to stop execution on the first node found in error
Definition: Executor.cxx:399
std::vector< Task * > _tasksSave
Definition: Executor.hxx:92
YACS::BASES::Condition _condForNewTasksToPerform
Definition: Executor.hxx:69
static int _maxThreads
Definition: Executor.hxx:131
std::list< std::string > _listOfBreakPoints
Definition: Executor.hxx:89
void RunB(Scheduler *graph, int debug=0, bool fromScratch=true)
Execute a graph with breakpoints or step by step.
Definition: Executor.cxx:235
void wakeUp()
must be used protected by _mutexForSchedulerUpdate!
Definition: Executor.cxx:1099
void unsetStopOnError()
ask to do not stop execution on nodes found in error
Definition: Executor.cxx:416
YACS::BASES::Semaphore _semForMaxThreads
Definition: Executor.hxx:70
bool getDPLScopeSensitive() const
Definition: Executor.hxx:109
static std::string ComputePlacement(Task *zeTask)
Definition: Executor.cxx:1412
void RunW(Scheduler *graph, int debug=0, bool fromScratch=true)
Definition: Executor.cxx:1833
YACS::ExecutorState getExecutorState()
Definition: Executor.cxx:380
void endTask(Task *task, YACS::Event ev)
Definition: Executor.cxx:1473
YACS::ExecutorState _executorState
Definition: Executor.hxx:87
ComposedNode * _root
Definition: Executor.hxx:66
std::ofstream _trace
Definition: Executor.hxx:93
void loadTask(Task *task, const WorkloadManager::RunInfo &runInfo)
Definition: Executor.cxx:1423
void filterTasksConsideringContainers(std::vector< Task * > &tsks)
Definition: Executor.cxx:1352
void runWlm(Scheduler *graph, int debug=0, bool fromScratch=true)
Definition: Executor.cxx:1688
static void * functionForTaskLoad(void *)
Definition: Executor.cxx:1140
void setListOfBreakPoints(std::list< std::string > listOfBreakPoints)
define a list of nodes names as breakpoints in the graph
Definition: Executor.cxx:491
void traceExec(Task *task, const std::string &message, const std::string &placement)
Definition: Executor.cxx:1301
int getMaxNbOfThreads() const
Definition: Executor.cxx:1111
void beginTask(Task *task)
Definition: Executor.cxx:1464
Scheduler * _mainSched
Definition: Executor.hxx:65
YACS::BASES::Mutex _mutexForNbOfConcurrentThreads
Definition: Executor.hxx:68
int getNumberOfRunningTasks()
number of running tasks
Definition: Executor.cxx:1130
YACS::BASES::Condition _condForPilot
Definition: Executor.hxx:72
std::chrono::steady_clock::time_point _start
Definition: Executor.hxx:98
YACS::BASES::Condition _condForStepByStep
Definition: Executor.hxx:71
void waitResume()
in modes Step By step or with BreakPoint, wait until pilot resumes the execution
Definition: Executor.cxx:840
virtual std::size_t getNumberOfFreePlace() const =0
virtual void release(const Task *node)=0
virtual int getNumberOfCoresPerWorker() const =0
virtual void allocateFor(const std::vector< const Task * > &nodes)=0
Base class for all nodes.
Definition: Node.hxx:70
virtual void setErrorDetails(const std::string &error)
Definition: Node.hxx:190
static std::string getStateName(YACS::StatesForNode state)
Return the name of a state.
Definition: Node.cxx:820
virtual void applyDPLScope(ComposedNode *gfn)
Definition: Node.cxx:676
virtual std::vector< Task * > getNextTasks(bool &isMore)=0
virtual std::string getName() const =0
virtual std::string getTaskName(Task *task) const =0
virtual bool isFinished()=0
virtual void notifyFrom(const Task *sender, YACS::Event event, const Executor *execInst)=0
virtual void init(bool start=true)=0
virtual void selectRunnableTasks(std::vector< Task * > &tasks)=0
virtual void exUpdateState()=0
virtual std::string getProperty(const std::string &name)=0
virtual void initService()=0
virtual void begin()=0
virtual void aborted()=0
virtual void imposeResource(const std::string &resource_name, const std::string &container_name)
Definition: Task.hxx:55
virtual void connected()=0
virtual void finished()=0
virtual void getCoupledTasks(std::set< Task * > &coupledSet)=0
virtual Container * getContainer()=0
virtual void disconnectService()=0
virtual void load()=0
virtual void execute()=0
virtual void connectService()=0
virtual YACS::StatesForNode getState() const =0
void openFileDump(const std::string &xmlDump)
static void loadResources(WorkloadManager::WorkloadManager &wm)
Definition: WlmTask.cxx:84
void YACSLIBENGINE_EXPORT StateLoader(Node *node, YACS::StatesForNode state)
Definition: Node.cxx:712
Event
Definition: define.hxx:56
@ ABORT
Definition: define.hxx:60
@ START
Definition: define.hxx:58
@ FINISH
Definition: define.hxx:59
ExecutionMode
Definition: define.hxx:75
@ STEPBYSTEP
Definition: define.hxx:77
@ CONTINUE
Definition: define.hxx:76
@ STOPBEFORENODES
Definition: define.hxx:78
ExecutorState
Definition: define.hxx:64
@ RUNNING
Definition: define.hxx:67
@ PAUSED
Definition: define.hxx:69
@ WAITINGTASKS
Definition: define.hxx:68
@ INITIALISED
Definition: define.hxx:66
@ NOTYETINITIALIZED
Definition: define.hxx:65
@ FINISHED
Definition: define.hxx:70
@ STOPPED
Definition: define.hxx:71
StatesForNode
Definition: define.hxx:34
@ TOLOAD
Definition: define.hxx:38
@ TOACTIVATE
Definition: define.hxx:40
@ TORECONNECT
Definition: define.hxx:48
@ ERROR
Definition: define.hxx:52
bool operator()(HomogeneousPoolContainer *lhs, HomogeneousPoolContainer *rhs) const
Definition: Executor.cxx:1334
Scheduler * sched
Definition: Executor.cxx:895
Task * task
Definition: Executor.cxx:894
Executor * execInst
Definition: Executor.cxx:896