Version: 9.15.0
YACS::ENGINE::Executor Class Reference

Threaded Executor. More...

#include <Executor.hxx>

Collaboration diagram for YACS::ENGINE::Executor:

Public Member Functions

 Executor ()
 
virtual ~Executor ()
 
void RunA (Scheduler *graph, int debug=0, bool fromScratch=true)
 Execute a graph waiting for completion. More...
 
void RunW (Scheduler *graph, int debug=0, bool fromScratch=true)
 
void RunB (Scheduler *graph, int debug=0, bool fromScratch=true)
 Execute a graph with breakpoints or step by step. More...
 
void runWlm (Scheduler *graph, int debug=0, bool fromScratch=true)
 
void setKeepGoingProperty (bool newVal)
 
bool getKeepGoingProperty () const
 
void setDPLScopeSensitive (bool newVal)
 
bool getDPLScopeSensitive () const
 
YACS::ExecutionMode getCurrentExecMode ()
 
YACS::ExecutorState getExecutorState ()
 
void setExecMode (YACS::ExecutionMode mode)
 Dynamically set the current mode of execution. More...
 
void setListOfBreakPoints (std::list< std::string > listOfBreakPoints)
 define a list of nodes names as breakpoints in the graph More...
 
std::list< std::string > getTasksToLoad ()
 Get the list of tasks to load, to define a subset to execute in step by step mode. More...
 
bool setStepsToExecute (std::list< std::string > listToExecute)
 Define a subset of task to execute in step by step mode. More...
 
bool resumeCurrentBreakPoint ()
 wake up executor when in pause More...
 
bool isNotFinished ()
 
void stopExecution ()
 stops the execution as soon as possible More...
 
bool saveState (const std::string &xmlFile)
 save the current state of execution in an xml file More...
 
bool loadState ()
 not yet implemented More...
 
int getMaxNbOfThreads () const
 
void setMaxNbOfThreads (int maxNbThreads)
 
int getNbOfThreads ()
 
int getNumberOfRunningTasks ()
 number of running tasks More...
 
void displayDot (Scheduler *graph)
 Display the graph state as a dot display, public method. More...
 
void setStopOnError (bool dumpRequested=false, std::string xmlFile="")
 ask to stop execution on the first node found in error More...
 
void unsetStopOnError ()
 ask to do not stop execution on nodes found in error More...
 
void waitPause ()
 suspend pilot execution until Executor is in pause or waiting tasks completion mode. More...
 
bool suspendASAP ()
 
void resume (bool suspended)
 
YACS::BASES::Mutex & getTheMutexForSchedulerUpdate ()
 
void loadTask (Task *task, const WorkloadManager::RunInfo &runInfo)
 
YACS::Event runTask (Task *task)
 
void makeDatastreamConnections (Task *task)
 
void beginTask (Task *task)
 
void endTask (Task *task, YACS::Event ev)
 
void failTask (Task *task, const std::string &message)
 

Static Public Attributes

static int _maxThreads
 
static size_t _threadStackSize
 

Protected Member Functions

bool checkBreakPoints ()
 Wait reactivation in modes Step By step or with BreakPoints. More...
 
void waitResume ()
 in modes Step By step or with BreakPoint, wait until pilot resumes the execution More...
 
void loadTask (Task *task, const Executor *execInst)
 Perform loading of a Task. More...
 
void loadParallelTasks (const std::vector< Task * > &tasks, const Executor *execInst)
 
void launchTasks (const std::vector< Task * > &tasks)
 Execute a list of tasks possibly connected through datastream links. More...
 
void launchTask (Task *task)
 Execute a Task in a thread. More...
 
void wakeUp ()
 must be used protected by _mutexForSchedulerUpdate! More...
 
void sleepWhileNoEventsFromAnyRunningTask ()
 wait until a running task ends More...
 
void traceExec (Task *task, const std::string &message, const std::string &placement)
 
void _displayDot (Scheduler *graph)
 Display the graph state as a dot display. More...
 
virtual void sendEvent (const std::string &event)
 emit notification to all observers registered with the dispatcher More...
 
void filterTasksConsideringContainers (std::vector< Task * > &tsks)
 

Static Protected Member Functions

static std::string ComputePlacement (Task *zeTask)
 
static void * functionForTaskLoad (void *)
 
static void * functionForTaskExecution (void *)
 Function to perform execution of a task in a thread. More...
 

Protected Attributes

Scheduler_mainSched
 
ComposedNode_root
 
int _nbOfConcurrentThreads
 
YACS::BASES::Mutex _mutexForNbOfConcurrentThreads
 
YACS::BASES::Condition _condForNewTasksToPerform
 
YACS::BASES::Semaphore _semForMaxThreads
 
YACS::BASES::Condition _condForStepByStep
 
YACS::BASES::Condition _condForPilot
 
YACS::BASES::Mutex _mutexForSchedulerUpdate
 
YACS::BASES::Mutex _mutexForTrace
 
std::uint32_t _maxNbThreads = 10000
 
bool _toContinue
 
bool _isOKToEnd
 
bool _stopOnErrorRequested
 
bool _dumpOnErrorRequested
 
bool _errorDetected
 
bool _isRunningunderExternalControl
 
bool _isWaitingEventsFromRunningTasks
 
int _numberOfRunningTasks
 
std::set< Task * > _runningTasks
 
int _numberOfEndedTasks
 
int _semThreadCnt
 
YACS::ExecutorState _executorState
 
YACS::ExecutionMode _execMode
 
std::list< std::string > _listOfBreakPoints
 
std::list< std::string > _listOfTasksToLoad
 
std::vector< Task * > _tasks
 
std::vector< Task * > _tasksSave
 
std::ofstream _trace
 
std::string _dumpErrorFile
 
bool _keepGoingOnFail
 
bool _DPLScopeSensitive
 specifies if scope DynParaLoop is active or not. False by default. More...
 
std::chrono::steady_clock::time_point _start
 

Detailed Description

Threaded Executor.

Definition at line 62 of file Executor.hxx.

Constructor & Destructor Documentation

◆ Executor()

Executor::Executor ( )

Definition at line 82 of file Executor.cxx.

83 {
84  _root=0;
85  _toContinue = true;
86  _isOKToEnd = false;
87  _stopOnErrorRequested = false;
88  _dumpOnErrorRequested = false;
89  _errorDetected = false;
96  DEBTRACE("Executor initialized with max threads = " << _maxThreads);
97 }
#define DEBTRACE(msg)
Definition: YacsTrace.hxx:31
bool _isWaitingEventsFromRunningTasks
Definition: Executor.hxx:82
bool _isRunningunderExternalControl
Definition: Executor.hxx:81
bool _DPLScopeSensitive
specifies if scope DynParaLoop is active or not. False by default.
Definition: Executor.hxx:97
YACS::ExecutionMode _execMode
Definition: Executor.hxx:88
static int _maxThreads
Definition: Executor.hxx:131
YACS::BASES::Semaphore _semForMaxThreads
Definition: Executor.hxx:70
YACS::ExecutorState _executorState
Definition: Executor.hxx:87
ComposedNode * _root
Definition: Executor.hxx:66
Scheduler * _mainSched
Definition: Executor.hxx:65
@ CONTINUE
Definition: define.hxx:76
@ NOTYETINITIALIZED
Definition: define.hxx:65

References _dumpOnErrorRequested, _errorDetected, _execMode, _executorState, _isOKToEnd, _isRunningunderExternalControl, _maxThreads, _numberOfEndedTasks, _numberOfRunningTasks, _root, _semThreadCnt, _stopOnErrorRequested, _toContinue, YACS::CONTINUE, DEBTRACE, and YACS::NOTYETINITIALIZED.

◆ ~Executor()

Executor::~Executor ( )
virtual

Definition at line 99 of file Executor.cxx.

100 {
101 }

Member Function Documentation

◆ _displayDot()

void Executor::_displayDot ( Scheduler graph)
protected

Display the graph state as a dot display.

Parameters
graph: the node to display

Definition at line 729 of file Executor.cxx.

730 {
731  std::ofstream g("titi");
732  ((ComposedNode*)graph)->writeDot(g);
733  g.close();
734  const char displayScript[]="display.sh";
735  if(isfile(displayScript))
736  system("sh display.sh");
737  else
738  system("dot -Tpng titi|display -delay 5");
739 }
static int isfile(const char *filename)
Definition: Executor.cxx:706
Base class for all composed nodes.

References isfile().

Referenced by displayDot(), RunA(), RunB(), and runWlm().

◆ beginTask()

void Executor::beginTask ( Task task)

Definition at line 1464 of file Executor.cxx.

1465 {
1466  // --- Critical section
1469  _runningTasks.insert(task);
1470  // --- End of critical section
1471 }
std::set< Task * > _runningTasks
Definition: Executor.hxx:84
YACS::BASES::Mutex _mutexForSchedulerUpdate
Definition: Executor.hxx:73

References _mutexForSchedulerUpdate, _numberOfRunningTasks, and _runningTasks.

Referenced by runWlm().

◆ checkBreakPoints()

bool Executor::checkBreakPoints ( )
protected

Wait reactivation in modes Step By step or with BreakPoints.

Check mode of execution (set by main thread):

Definition at line 752 of file Executor.cxx.

753 {
754  DEBTRACE("Executor::checkBreakPoints()");
755  vector<Task *>::iterator iter;
756  bool endRequested = false;
757 
758  switch (_execMode)
759  {
760  case YACS::CONTINUE:
761  {
762  break;
763  }
765  {
766  bool stop = false;
767  { // --- Critical section
769  _tasksSave = _tasks;
770  for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
771  {
772  string nodeToLoad = _mainSched->getTaskName(*iter);
773  if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
774  != _listOfBreakPoints.end())
775  {
776  stop = true;
777  break;
778  }
779  }
780  if (stop)
781  {
782  _listOfTasksToLoad.clear();
783  for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
784  {
785  string nodeToLoad = _mainSched->getTaskName(*iter);
786  _listOfTasksToLoad.push_back(nodeToLoad);
787  }
788  if (getNbOfThreads())
789  _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
790  else
792  sendEvent("executor");
793  _condForPilot.notify_all();
794  }
795  if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
796  if (_isOKToEnd) endRequested = true;
797  } // --- End of critical section
798  if (stop) DEBTRACE("wake up from waitResume");
799  break;
800  }
801  default:
802  case YACS::STEPBYSTEP:
803  {
804  { // --- Critical section
806  _tasksSave = _tasks;
807  _listOfTasksToLoad.clear();
808  for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
809  {
810  string nodeToLoad = _mainSched->getTaskName(*iter);
811  _listOfTasksToLoad.push_back(nodeToLoad);
812  }
813  if (getNbOfThreads())
814  _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
815  else
817  sendEvent("executor");
818  _condForPilot.notify_all();
819  if (!_isOKToEnd)
820  waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
821  // or, if no pilot, wait until no more running tasks (stop on error)
822  if (_isOKToEnd) endRequested = true;
823  } // --- End of critical section
824  DEBTRACE("wake up from waitResume");
825  break;
826  }
827  }
828  DEBTRACE("endRequested: " << endRequested);
829  return endRequested;
830 }
std::list< std::string > _listOfTasksToLoad
Definition: Executor.hxx:90
virtual void sendEvent(const std::string &event)
emit notification to all observers registered with the dispatcher
Definition: Executor.cxx:1324
std::vector< Task * > _tasks
Definition: Executor.hxx:91
std::vector< Task * > _tasksSave
Definition: Executor.hxx:92
std::list< std::string > _listOfBreakPoints
Definition: Executor.hxx:89
YACS::BASES::Condition _condForPilot
Definition: Executor.hxx:72
void waitResume()
in modes Step By step or with BreakPoint, wait until pilot resumes the execution
Definition: Executor.cxx:840
virtual std::string getTaskName(Task *task) const =0
@ STEPBYSTEP
Definition: define.hxx:77
@ STOPBEFORENODES
Definition: define.hxx:78
@ PAUSED
Definition: define.hxx:69
@ WAITINGTASKS
Definition: define.hxx:68

References _condForPilot, _execMode, _executorState, _isOKToEnd, _listOfBreakPoints, _listOfTasksToLoad, _mainSched, _mutexForSchedulerUpdate, _tasks, _tasksSave, YACS::CONTINUE, DEBTRACE, getNbOfThreads(), YACS::ENGINE::Scheduler::getTaskName(), YACS::PAUSED, sendEvent(), YACS::STEPBYSTEP, YACS::STOPBEFORENODES, YACS::WAITINGTASKS, and waitResume().

Referenced by RunB(), and runWlm().

◆ ComputePlacement()

std::string Executor::ComputePlacement ( Task zeTask)
staticprotected

Definition at line 1412 of file Executor.cxx.

1413 {
1414  std::string placement("---");
1415  if(!zeTask)
1416  return placement;
1417  if(zeTask->getContainer())
1418  placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1419  return placement;
1420 }
virtual std::string getFullPlacementId(const Task *askingNode) const =0
virtual Container * getContainer()=0

References YACS::ENGINE::Task::getContainer(), and YACS::ENGINE::Container::getFullPlacementId().

Referenced by functionForTaskExecution(), launchTask(), launchTasks(), loadTask(), makeDatastreamConnections(), and runTask().

◆ displayDot()

void Executor::displayDot ( Scheduler graph)

Display the graph state as a dot display, public method.

Definition at line 718 of file Executor.cxx.

719 {
721  _displayDot(graph);
722 }
void _displayDot(Scheduler *graph)
Display the graph state as a dot display.
Definition: Executor.cxx:729

References _displayDot(), and _isRunningunderExternalControl.

Referenced by main().

◆ endTask()

void Executor::endTask ( Task task,
YACS::Event  ev 
)

Definition at line 1473 of file Executor.cxx.

1474 {
1476  try
1477  {
1478  if (ev == YACS::FINISH) task->finished();
1479  if (ev == YACS::ABORT)
1480  {
1481  _errorDetected = true;
1483  {
1485  _isOKToEnd = true;
1486  }
1487  task->aborted();
1488  }
1489  //traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1490  _mainSched->notifyFrom(task,ev,this);
1491  }
1492  catch(Exception& ex)
1493  {
1494  //notify has failed : it is supposed to have set state
1495  //so no need to do anything
1496  std::cerr << "Error during notification" << std::endl;
1497  std::cerr << ex.what() << std::endl;
1498  }
1499  catch(...)
1500  {
1501  //notify has failed : it is supposed to have set state
1502  //so no need to do anything
1503  std::cerr << "Notification failed" << std::endl;
1504  }
1506  _runningTasks.erase(task);
1507  if ((_numberOfRunningTasks == 0) && (_execMode != YACS::CONTINUE)) // no more running tasks
1508  {
1510  {
1512  sendEvent("executor");
1513  _condForPilot.notify_all();
1514  if (_errorDetected &&
1517  _condForStepByStep.notify_all(); // exec thread may be on waitResume
1518  }
1519  }
1521  wakeUp();
1522 }
void wakeUp()
must be used protected by _mutexForSchedulerUpdate!
Definition: Executor.cxx:1099
YACS::BASES::Condition _condForStepByStep
Definition: Executor.hxx:71
virtual void notifyFrom(const Task *sender, YACS::Event event, const Executor *execInst)=0
virtual void aborted()=0
virtual void finished()=0
@ ABORT
Definition: define.hxx:60
@ FINISH
Definition: define.hxx:59

References _condForPilot, _condForStepByStep, _errorDetected, _execMode, _executorState, _isOKToEnd, _isRunningunderExternalControl, _mainSched, _mutexForSchedulerUpdate, _numberOfRunningTasks, _runningTasks, _stopOnErrorRequested, YACS::ABORT, YACS::ENGINE::Task::aborted(), YACS::CONTINUE, testCppPluginInvokation::ex, YACS::FINISH, YACS::ENGINE::Task::finished(), YACS::ENGINE::Scheduler::notifyFrom(), YACS::PAUSED, sendEvent(), YACS::STEPBYSTEP, YACS::WAITINGTASKS, and wakeUp().

Referenced by failTask(), and YACS::ENGINE::WlmTask::run().

◆ failTask()

void Executor::failTask ( Task task,
const std::string &  message 
)

Definition at line 1524 of file Executor.cxx.

1525 {
1526  ElementaryNode* elemNode = dynamic_cast<ElementaryNode*>(task);
1527  if(elemNode != nullptr)
1528  {
1529  StateLoader(elemNode, YACS::ERROR);
1530  elemNode->setErrorDetails(message);
1531  }
1532  endTask(task, YACS::ABORT);
1533 }
Base class for all calculation nodes.
void endTask(Task *task, YACS::Event ev)
Definition: Executor.cxx:1473
virtual void setErrorDetails(const std::string &error)
Definition: Node.hxx:191
void YACSLIBENGINE_EXPORT StateLoader(Node *node, YACS::StatesForNode state)
Definition: Node.cxx:719
@ ERROR
Definition: define.hxx:52

References YACS::ABORT, endTask(), YACS::ERROR, YACS::ENGINE::Node::setErrorDetails(), and YACS::ENGINE::StateLoader().

Referenced by YACS::ENGINE::WlmTask::run().

◆ filterTasksConsideringContainers()

void Executor::filterTasksConsideringContainers ( std::vector< Task * > &  tsks)
protected

This method takes in input a list of tasks and selects from that lists a part of it considering only the containers. If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the tsks untouched.

Parameters
[in,out]tsks- list of tasks to be

Definition at line 1352 of file Executor.cxx.

1353 {
1354  std::map<HomogeneousPoolContainer *, std::vector<Task *>, HPCCompare > m;
1355  for(auto cur : tsks)
1356  {
1357  if(!cur)
1358  continue;
1359  Container *cont(cur->getContainer());
1360  if(!cont)
1361  {
1362  m[nullptr].push_back(cur);
1363  continue;
1364  }
1365  HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1366  if(!contC)
1367  {
1368  m[nullptr].push_back(cur);
1369  continue;
1370  }
1371  m[contC].push_back(cur);
1372  }
1373  //
1374  std::vector<Task *> ret;
1375  for(auto it : m)
1376  {
1377  HomogeneousPoolContainer *curhpc(it.first);
1378  const std::vector<Task *>& curtsks(it.second);
1379  if(!curhpc)
1380  {
1381  std::uint32_t nbThreadsRunning = _runningTasks.size();
1382  std::uint32_t nbOfFreeSpace = _maxNbThreads - min(_maxNbThreads,nbThreadsRunning);
1383  std::uint32_t nbOfCandidates = static_cast<std::uint32_t>( curtsks.size() );
1384  std::uint32_t nbOfCandidatesToBeLaunched = std::min(nbOfCandidates,nbOfFreeSpace);
1385  DEBTRACE("nb threads running: " << nbThreadsRunning);
1386  DEBTRACE("MaxNbThreads: " << _maxNbThreads);
1387  DEBTRACE("nbOfFreeSpace: " << nbOfFreeSpace);
1388  DEBTRACE("nbOfCandidates: " << nbOfCandidates);
1389  DEBTRACE("nbOfCandidatesToBeLaunched: " << nbOfCandidatesToBeLaunched);
1390  ret.insert(ret.end(),curtsks.begin(),curtsks.begin() + nbOfCandidatesToBeLaunched);
1391  }
1392  else
1393  {
1394  // start of critical section for container curhpc
1395  std::lock_guard<std::mutex> alckCont(curhpc->getLocker());
1396  std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1397  std::size_t sz(curhpc->getNumberOfFreePlace());
1398  std::vector<Task *>::const_iterator it2(curtsks.begin());
1399  for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1400  {
1401  vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1402  ret.push_back(*it2);
1403  }
1404  curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1405  //end of critical section
1406  }
1407  }
1408  //
1409  tsks=ret;
1410 }
std::uint32_t _maxNbThreads
Definition: Executor.hxx:75

References _maxNbThreads, _runningTasks, YACS::ENGINE::HomogeneousPoolContainer::allocateFor(), DEBTRACE, YACS::ENGINE::HomogeneousPoolContainer::getLocker(), YACS::ENGINE::HomogeneousPoolContainer::getNumberOfFreePlace(), yacsorb.CORBAEngineTest::i, and gui.GraphViewer::m.

Referenced by RunB(), and runWlm().

◆ functionForTaskExecution()

void * Executor::functionForTaskExecution ( void *  arg)
staticprotected

Function to perform execution of a task in a thread.

Parameters
arg: 3 elements (a Task, a Scheduler, an Executor)

Calls Task::execute

Calls Task::finished when the task is finished

Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished

Calls Executor::wakeUp and Executor::notifyEndOfThread

Definition at line 1165 of file Executor.cxx.

1166 {
1167  DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1168 
1169  struct threadargs *args = (struct threadargs *) arg;
1170  Task *task=args->task;
1171  Scheduler *sched=args->sched;
1172  Executor *execInst=args->execInst;
1173  delete args;
1175 
1176  Thread::detach();
1177 
1178  // Execute task
1179 
1181  {
1182  Node *node(dynamic_cast<Node *>(task));
1183  ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1184  if(node!=0 && gfn!=0)
1185  node->applyDPLScope(gfn);
1186  }
1187 
1189  try
1190  {
1191  execInst->traceExec(task, "start execution",ComputePlacement(task));
1192  task->execute();
1193  execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1194  }
1195  catch(Exception& ex)
1196  {
1197  std::cerr << "YACS Exception during execute" << std::endl;
1198  std::cerr << ex.what() << std::endl;
1199  ev=YACS::ABORT;
1200  string message = "end execution ABORT, ";
1201  message += ex.what();
1203  }
1204  catch(...)
1205  {
1206  // Execution has failed
1207  std::cerr << "Execution has failed: unknown reason" << std::endl;
1208  ev=YACS::ABORT;
1209  execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1210  }
1211 
1212  // Disconnect task
1213  try
1214  {
1215  DEBTRACE("task->disconnectService()");
1217  execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1218  }
1219  catch(...)
1220  {
1221  // Disconnect has failed
1222  std::cerr << "disconnect has failed" << std::endl;
1223  ev=YACS::ABORT;
1224  execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1225  }
1226  //
1227 
1228  std::string placement(ComputePlacement(task));
1229 
1230  // container management for HomogeneousPoolOfContainer
1231 
1233  if(contC)
1234  {
1235  std::lock_guard<std::mutex> alckCont(contC->getLocker());
1236  contC->release(task);
1237  }
1238 
1239  DEBTRACE("End task->execute()");
1240  { // --- Critical section
1242  try
1243  {
1244  if (ev == YACS::FINISH) task->finished();
1245  if (ev == YACS::ABORT)
1246  {
1247  execInst->_errorDetected = true;
1249  {
1251  execInst->_isOKToEnd = true;
1252  }
1253  task->aborted();
1254  }
1255  execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1257  }
1258  catch(Exception& ex)
1259  {
1260  //notify has failed : it is supposed to have set state
1261  //so no need to do anything
1262  std::cerr << "Error during notification" << std::endl;
1263  std::cerr << ex.what() << std::endl;
1264  }
1265  catch(...)
1266  {
1267  //notify has failed : it is supposed to have set state
1268  //so no need to do anything
1269  std::cerr << "Notification failed" << std::endl;
1270  }
1272  execInst->_runningTasks.erase(task);
1273  DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks
1274  << " _execMode: " << execInst->_execMode
1275  << " _executorState: " << execInst->_executorState);
1276  if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1277  {
1279  {
1281  execInst->sendEvent("executor");
1282  execInst->_condForPilot.notify_all();
1283  if (execInst->_errorDetected &&
1286  execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1287  }
1288  }
1289  DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1290  execInst->_semForMaxThreads.post();
1291  execInst->_semThreadCnt += 1;
1292  DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1294 
1295  } // --- End of critical section (change state)
1296 
1297  Thread::exit(0);
1298  return 0;
1299 }
Threaded Executor.
Definition: Executor.hxx:63
bool getDPLScopeSensitive() const
Definition: Executor.hxx:109
static std::string ComputePlacement(Task *zeTask)
Definition: Executor.cxx:1412
void traceExec(Task *task, const std::string &message, const std::string &placement)
Definition: Executor.cxx:1301
Base class for all nodes.
Definition: Node.hxx:70
static std::string getStateName(YACS::StatesForNode state)
Return the name of a state.
Definition: Node.cxx:827
virtual void disconnectService()=0
virtual void execute()=0
virtual YACS::StatesForNode getState() const =0
Event
Definition: define.hxx:56
Scheduler * sched
Definition: Executor.cxx:895
Task * task
Definition: Executor.cxx:894
Executor * execInst
Definition: Executor.cxx:896

References _condForPilot, _condForStepByStep, _errorDetected, _execMode, _executorState, _isOKToEnd, _isRunningunderExternalControl, _mutexForSchedulerUpdate, _numberOfRunningTasks, _runningTasks, _semForMaxThreads, _semThreadCnt, _stopOnErrorRequested, YACS::ABORT, YACS::ENGINE::Task::aborted(), YACS::ENGINE::Node::applyDPLScope(), driver_internal::args, ComputePlacement(), YACS::CONTINUE, DEBTRACE, YACS::ENGINE::Task::disconnectService(), testCppPluginInvokation::ex, threadargs::execInst, YACS::ENGINE::Task::execute(), YACS::FINISH, YACS::ENGINE::Task::finished(), YACS::ENGINE::Task::getContainer(), getDPLScopeSensitive(), YACS::ENGINE::HomogeneousPoolContainer::getLocker(), YACS::ENGINE::Task::getState(), YACS::ENGINE::Node::getStateName(), YACS::ENGINE::Scheduler::notifyFrom(), YACS::PAUSED, YACS::ENGINE::HomogeneousPoolContainer::release(), threadargs::sched, sendEvent(), YACS::STEPBYSTEP, threadargs::task, traceExec(), YACS::WAITINGTASKS, and wakeUp().

Referenced by launchTask().

◆ functionForTaskLoad()

void * Executor::functionForTaskLoad ( void *  arg)
staticprotected

This thread is NOT supposed to be detached !

Definition at line 1140 of file Executor.cxx.

1141 {
1142  DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1143  struct threadargs *args = (struct threadargs *) arg;
1144  Task *task=args->task;
1145  Scheduler *sched=args->sched;
1146  Executor *execInst=args->execInst;
1147  delete args;
1148  execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1149  return 0;
1150 }
void loadTask(Task *task, const WorkloadManager::RunInfo &runInfo)
Definition: Executor.cxx:1423

References driver_internal::args, DEBTRACE, threadargs::execInst, loadTask(), threadargs::sched, and threadargs::task.

Referenced by loadParallelTasks().

◆ getCurrentExecMode()

YACS::ExecutionMode Executor::getCurrentExecMode ( )

Definition at line 373 of file Executor.cxx.

374 {
376  return _execMode;
377 }

References _execMode, and _isRunningunderExternalControl.

◆ getDPLScopeSensitive()

bool YACS::ENGINE::Executor::getDPLScopeSensitive ( ) const
inline

Definition at line 109 of file Executor.hxx.

109 { return _DPLScopeSensitive; }

Referenced by functionForTaskExecution(), and runTask().

◆ getExecutorState()

YACS::ExecutorState Executor::getExecutorState ( )

Definition at line 380 of file Executor.cxx.

381 {
383  return _executorState;
384 }

References _executorState, and _isRunningunderExternalControl.

Referenced by main().

◆ getKeepGoingProperty()

bool YACS::ENGINE::Executor::getKeepGoingProperty ( ) const
inline

Definition at line 107 of file Executor.hxx.

107 { return _keepGoingOnFail; }

Referenced by YACS::ENGINE::ForEachLoopGen::updateStateOnFailedEventFrom().

◆ getMaxNbOfThreads()

int Executor::getMaxNbOfThreads ( ) const

Definition at line 1111 of file Executor.cxx.

1112 {
1113  return (int)_maxNbThreads;
1114 }

References _maxNbThreads.

◆ getNbOfThreads()

int Executor::getNbOfThreads ( )

Definition at line 1121 of file Executor.cxx.

1122 {
1123  int ret = 0;
1126  return ret;
1127 }
YACS::BASES::Mutex _mutexForNbOfConcurrentThreads
Definition: Executor.hxx:68

References _isRunningunderExternalControl, and _mutexForNbOfConcurrentThreads.

Referenced by checkBreakPoints().

◆ getNumberOfRunningTasks()

int Executor::getNumberOfRunningTasks ( )

◆ getTasksToLoad()

std::list< std::string > Executor::getTasksToLoad ( )

Get the list of tasks to load, to define a subset to execute in step by step mode.

If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty. Use Executor::waitPause to wait.

Definition at line 507 of file Executor.cxx.

508 {
509  DEBTRACE("Executor::getTasksToLoad()");
510  list<string> listOfNodesToLoad;
511  listOfNodesToLoad.clear();
512  { // --- Critical section
515  switch (_executorState)
516  {
517  case YACS::WAITINGTASKS:
518  case YACS::PAUSED:
519  {
520  listOfNodesToLoad = _listOfTasksToLoad;
521  break;
522  }
524  case YACS::INITIALISED:
525  case YACS::RUNNING:
526  case YACS::FINISHED:
527  case YACS::STOPPED:
528  default:
529  {
530  break;
531  }
532  }
533  } // --- End of critical section
534  return listOfNodesToLoad;
535 }
@ RUNNING
Definition: define.hxx:67
@ INITIALISED
Definition: define.hxx:66
@ FINISHED
Definition: define.hxx:70
@ STOPPED
Definition: define.hxx:71

References _executorState, _isRunningunderExternalControl, _listOfTasksToLoad, _mutexForSchedulerUpdate, DEBTRACE, YACS::FINISHED, YACS::INITIALISED, YACS::NOTYETINITIALIZED, YACS::PAUSED, YACS::RUNNING, YACS::STOPPED, and YACS::WAITINGTASKS.

◆ getTheMutexForSchedulerUpdate()

YACS::BASES::Mutex& YACS::ENGINE::Executor::getTheMutexForSchedulerUpdate ( )
inline

◆ isNotFinished()

bool Executor::isNotFinished ( )

Definition at line 387 of file Executor.cxx.

388 {
390  return _toContinue;
391 }

References _isRunningunderExternalControl, and _toContinue.

Referenced by main().

◆ launchTask()

void Executor::launchTask ( Task task)
protected

Execute a Task in a thread.

Parameters
task: Task to execute

Calls Scheduler::notifyFrom of main node (_mainSched) to notify start

Calls Executor::functionForTaskExecution in Thread

Definition at line 1022 of file Executor.cxx.

1023 {
1024  DEBTRACE("Executor::launchTask(Task *task)");
1025  struct threadargs *args;
1026  if(task->getState() != YACS::TOACTIVATE)return;
1027 
1028  DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
1029  if(_semThreadCnt == 0)
1030  {
1031  // --- Critical section
1033  //check if we have enough threads to run
1034  std::set<Task*> tmpSet=_runningTasks;
1035  std::set<Task*>::iterator it = tmpSet.begin();
1036  std::string status="running";
1037  std::set<Task*> coupledSet;
1038  while( it != tmpSet.end() )
1039  {
1040  Task* tt=*it;
1041  coupledSet.clear();
1042  tt->getCoupledTasks(coupledSet);
1043  status="running";
1044  for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1045  {
1046  if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1047  tmpSet.erase(*iter);
1048  }
1049  if(status=="running")break;
1050  it = tmpSet.begin();
1051  }
1052 
1053  if(status=="toactivate")
1054  {
1055  std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1056  std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1057  }
1058  // --- End of critical section
1059  }
1060 
1061  _semForMaxThreads.wait();
1062  _semThreadCnt -= 1;
1063 
1064  args= new threadargs;
1065  args->task = task;
1066  args->sched = _mainSched;
1067  args->execInst = this;
1068 
1069  traceExec(task, "launch",ComputePlacement(task));
1070 
1071  { // --- Critical section
1074  _runningTasks.insert(task);
1075  task->begin(); //change state to ACTIVATED
1076  } // --- End of critical section
1078 }
static size_t _threadStackSize
Definition: Executor.hxx:132
static void * functionForTaskExecution(void *)
Function to perform execution of a task in a thread.
Definition: Executor.cxx:1165
virtual void begin()=0
virtual void getCoupledTasks(std::set< Task * > &coupledSet)=0
@ TOACTIVATE
Definition: define.hxx:40

References _mainSched, _maxThreads, _mutexForSchedulerUpdate, _numberOfRunningTasks, _runningTasks, _semForMaxThreads, _semThreadCnt, _threadStackSize, driver_internal::args, YACS::ENGINE::Task::begin(), ComputePlacement(), DEBTRACE, functionForTaskExecution(), YACS::ENGINE::Task::getCoupledTasks(), YACS::ENGINE::Task::getState(), threadargs::task, YACS::TOACTIVATE, and traceExec().

Referenced by launchTasks().

◆ launchTasks()

void Executor::launchTasks ( const std::vector< Task * > &  tasks)
protected

Execute a list of tasks possibly connected through datastream links.

Parameters
tasks: a list of tasks to execute

Definition at line 921 of file Executor.cxx.

922 {
923  //First phase, make datastream connections
924  for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
925  {
926  YACS::StatesForNode state=(*iter)->getState();
927  if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
928  try
929  {
930  (*iter)->connectService();
931  traceExec(*iter, "connectService",ComputePlacement(*iter));
932  {//Critical section
934  (*iter)->connected();
935  }//End of critical section
936  }
937  catch(Exception& ex)
938  {
939  std::cerr << ex.what() << std::endl;
940  try
941  {
942  (*iter)->disconnectService();
943  traceExec(*iter, "disconnectService",ComputePlacement(*iter));
944  }
945  catch(...)
946  {
947  // Disconnect has failed
948  traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
949  }
950  {//Critical section
952  (*iter)->aborted();
953  _mainSched->notifyFrom(*iter,YACS::ABORT,this);
954  }//End of critical section
955  }
956  catch(...)
957  {
958  std::cerr << "Problem in connectService" << std::endl;
959  try
960  {
961  (*iter)->disconnectService();
962  traceExec(*iter, "disconnectService",ComputePlacement(*iter));
963  }
964  catch(...)
965  {
966  // Disconnect has failed
967  traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
968  }
969  {//Critical section
971  (*iter)->aborted();
972  _mainSched->notifyFrom(*iter,YACS::ABORT,this);
973  }//End of critical section
974  }
975  if((*iter)->getState() == YACS::ERROR)
976  {
977  //try to put all coupled tasks in error
978  std::set<Task*> coupledSet;
979  (*iter)->getCoupledTasks(coupledSet);
980  for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
981  {
982  Task* t=*it;
983  if(t == *iter)continue;
984  if(t->getState() == YACS::ERROR)continue;
985  try
986  {
987  t->disconnectService();
988  traceExec(t, "disconnectService",ComputePlacement(*iter));
989  }
990  catch(...)
991  {
992  // Disconnect has failed
993  traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
994  }
995  {//Critical section
997  t->aborted();
999  }//End of critical section
1000  traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
1001  }
1002  }
1003  traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
1004  }
1005 
1006  //Second phase, execute each task in a thread
1007  for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
1008  {
1009  launchTask(*iter);
1010  }
1011 }
void launchTask(Task *task)
Execute a Task in a thread.
Definition: Executor.cxx:1022
StatesForNode
Definition: define.hxx:34
@ TOLOAD
Definition: define.hxx:38
@ TORECONNECT
Definition: define.hxx:48

References _mainSched, _mutexForSchedulerUpdate, YACS::ABORT, ComputePlacement(), YACS::ERROR, testCppPluginInvokation::ex, YACS::ENGINE::Node::getStateName(), launchTask(), YACS::ENGINE::Scheduler::notifyFrom(), yacsorb.CORBAEngineTest::state, gui.Appli::t, YACS::TOLOAD, YACS::TORECONNECT, and traceExec().

Referenced by RunA(), and RunB().

◆ loadParallelTasks()

void Executor::loadParallelTasks ( const std::vector< Task * > &  tasks,
const Executor execInst 
)
protected

Definition at line 899 of file Executor.cxx.

900 {
901  std::vector<Thread> ths(tasks.size());
902  std::size_t ithread(0);
903  for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
904  {
905  DEBTRACE("Executor::loadParallelTasks(Task *task)");
906  struct threadargs *args(new threadargs);
907  args->task = (*iter);
908  args->sched = _mainSched;
909  args->execInst = this;
910  ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
911  }
912  for(ithread=0;ithread<tasks.size();ithread++)
913  ths[ithread].join();
914 }
static void * functionForTaskLoad(void *)
Definition: Executor.cxx:1140

References _mainSched, _tasks, _threadStackSize, driver_internal::args, DEBTRACE, and functionForTaskLoad().

Referenced by RunB().

◆ loadState()

bool Executor::loadState ( )

not yet implemented

Definition at line 698 of file Executor.cxx.

699 {
700  DEBTRACE("Executor::loadState()");
702  return true;
703 }

References _isRunningunderExternalControl, and DEBTRACE.

◆ loadTask() [1/2]

void Executor::loadTask ( Task task,
const Executor execInst 
)
protected

Perform loading of a Task.

Parameters
task: Task to load

Definition at line 853 of file Executor.cxx.

854 {
855  DEBTRACE("Executor::loadTask(Task *task)");
856  if(task->getState() != YACS::TOLOAD)
857  return;
858  traceExec(task, "state:TOLOAD", ComputePlacement(task));
859  {//Critical section
861  _mainSched->notifyFrom(task,YACS::START,execInst);
862  }//End of critical section
863  try
864  {
865  traceExec(task, "load", ComputePlacement(task));
866  task->load();
867  traceExec(task, "initService", ComputePlacement(task));
868  task->initService();
869  }
870  catch(Exception& ex)
871  {
872  std::cerr << ex.what() << std::endl;
873  {//Critical section
875  task->aborted();
876  _mainSched->notifyFrom(task,YACS::ABORT,execInst);
877  traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
878  }//End of critical section
879  }
880  catch(...)
881  {
882  std::cerr << "Load failed" << std::endl;
883  {//Critical section
885  task->aborted();
886  _mainSched->notifyFrom(task,YACS::ABORT,execInst);
887  traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
888  }//End of critical section
889  }
890 }
virtual void initService()=0
virtual void load()=0
@ START
Definition: define.hxx:58

References _mainSched, _mutexForSchedulerUpdate, YACS::ABORT, YACS::ENGINE::Task::aborted(), ComputePlacement(), DEBTRACE, testCppPluginInvokation::ex, YACS::ENGINE::Task::getState(), YACS::ENGINE::Node::getStateName(), YACS::ENGINE::Task::initService(), YACS::ENGINE::Task::load(), YACS::ENGINE::Scheduler::notifyFrom(), YACS::START, YACS::TOLOAD, and traceExec().

◆ loadTask() [2/2]

void Executor::loadTask ( Task task,
const WorkloadManager::RunInfo &  runInfo 
)

Definition at line 1423 of file Executor.cxx.

1424 {
1425  if(task->getState() != YACS::TOLOAD)
1426  return;
1427  traceExec(task, "state:TOLOAD", ComputePlacement(task));
1428  {//Critical section
1430  _mainSched->notifyFrom(task,YACS::START,this);
1431  }//End of critical section
1432  try
1433  {
1434  std::ostringstream container_name;
1435  container_name << runInfo.type.name << "-" << runInfo.index;
1436  task->imposeResource(runInfo.resource.name, container_name.str());
1437  traceExec(task, "load", ComputePlacement(task));
1438  task->load();
1439  traceExec(task, "initService", ComputePlacement(task));
1440  task->initService();
1441  }
1442  catch(Exception& ex)
1443  {
1444  std::cerr << ex.what() << std::endl;
1445  {//Critical section
1447  task->aborted();
1448  _mainSched->notifyFrom(task,YACS::ABORT, this);
1449  traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1450  }//End of critical section
1451  }
1452  catch(...)
1453  {
1454  std::cerr << "Load failed" << std::endl;
1455  {//Critical section
1457  task->aborted();
1458  _mainSched->notifyFrom(task,YACS::ABORT, this);
1459  traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
1460  }//End of critical section
1461  }
1462 }
virtual void imposeResource(const std::string &resource_name, const std::string &container_name)
Definition: Task.hxx:55

References _mainSched, _mutexForSchedulerUpdate, YACS::ABORT, YACS::ENGINE::Task::aborted(), ComputePlacement(), testCppPluginInvokation::ex, YACS::ENGINE::Task::getState(), YACS::ENGINE::Node::getStateName(), YACS::ENGINE::Task::imposeResource(), YACS::ENGINE::Task::initService(), YACS::ENGINE::Task::load(), YACS::ENGINE::Scheduler::notifyFrom(), YACS::START, YACS::TOLOAD, and traceExec().

Referenced by functionForTaskLoad(), YACS::ENGINE::WlmTask::run(), and RunA().

◆ makeDatastreamConnections()

void Executor::makeDatastreamConnections ( Task task)

Definition at line 1605 of file Executor.cxx.

1606 {
1609  return;
1610  try
1611  {
1612  task->connectService();
1613  traceExec(task, "connectService",ComputePlacement(task));
1614  {//Critical section
1616  task->connected();
1617  }//End of critical section
1618  }
1619  catch(Exception& ex)
1620  {
1621  std::cerr << ex.what() << std::endl;
1622  try
1623  {
1624  (task)->disconnectService();
1625  traceExec(task, "disconnectService",ComputePlacement(task));
1626  }
1627  catch(...)
1628  {
1629  // Disconnect has failed
1630  traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1631  }
1632  {//Critical section
1634  task->aborted();
1635  _mainSched->notifyFrom(task,YACS::ABORT,this);
1636  }//End of critical section
1637  }
1638  catch(...)
1639  {
1640  std::cerr << "Problem in connectService" << std::endl;
1641  try
1642  {
1643  (task)->disconnectService();
1644  traceExec(task, "disconnectService",ComputePlacement(task));
1645  }
1646  catch(...)
1647  {
1648  // Disconnect has failed
1649  traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1650  }
1651  {//Critical section
1653  task->aborted();
1654  _mainSched->notifyFrom(task,YACS::ABORT,this);
1655  }//End of critical section
1656  }
1657  if(task->getState() == YACS::ERROR)
1658  {
1659  //try to put all coupled tasks in error
1660  std::set<Task*> coupledSet;
1661  task->getCoupledTasks(coupledSet);
1662  for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
1663  {
1664  Task* t=*it;
1665  if(t == task)continue;
1666  if(t->getState() == YACS::ERROR)continue;
1667  try
1668  {
1669  t->disconnectService();
1670  traceExec(t, "disconnectService",ComputePlacement(task));
1671  }
1672  catch(...)
1673  {
1674  // Disconnect has failed
1675  traceExec(t, "disconnectService failed, ABORT",ComputePlacement(task));
1676  }
1677  {//Critical section
1679  t->aborted();
1681  }//End of critical section
1682  traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(task));
1683  }
1684  }
1685  traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1686 }
virtual void connected()=0
virtual void connectService()=0

References _mainSched, _mutexForSchedulerUpdate, YACS::ABORT, YACS::ENGINE::Task::aborted(), ComputePlacement(), YACS::ENGINE::Task::connected(), YACS::ENGINE::Task::connectService(), YACS::ERROR, testCppPluginInvokation::ex, YACS::ENGINE::Task::getCoupledTasks(), YACS::ENGINE::Task::getState(), YACS::ENGINE::Node::getStateName(), YACS::ENGINE::Scheduler::notifyFrom(), yacsorb.CORBAEngineTest::state, gui.Appli::t, YACS::TOLOAD, YACS::TORECONNECT, and traceExec().

Referenced by YACS::ENGINE::WlmTask::run().

◆ resume()

void Executor::resume ( bool  suspended)

This method is expected to be called in association with suspendASAP method. Expected to be called just after suspendASAP with output of resume as input parameter

Definition at line 660 of file Executor.cxx.

661 {
662  if(suspended)
663  _mutexForSchedulerUpdate.unLock();
664 }

References _mutexForSchedulerUpdate.

Referenced by gui.Appli.Browser::run(), gui.Appli.Browser::step(), and gui.Appli.Browser::susp().

◆ resumeCurrentBreakPoint()

bool Executor::resumeCurrentBreakPoint ( )

wake up executor when in pause

When Executor is in state paused or waiting for task completion, the thread running loop RunB waits on condition _condForStepByStep. Thread RunB is waken up.

Returns
true when actually wakes up executor

Definition at line 448 of file Executor.cxx.

449 {
450  DEBTRACE("Executor::resumeCurrentBreakPoint()");
451  bool ret = false;
452  //bool doDump = false;
453  { // --- Critical section
456  DEBTRACE("_executorState: " << _executorState);
457  switch (_executorState)
458  {
459  case YACS::WAITINGTASKS:
460  case YACS::PAUSED:
461  {
462  _condForStepByStep.notify_all();
464  sendEvent("executor");
465  ret = true;
466  //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
467  break;
468  }
469  case YACS::FINISHED:
470  case YACS::STOPPED:
471  {
472  //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
473  DEBTRACE("Graph Execution finished or stopped !");
474  break;
475  }
476  default :
477  {
478  // debug: no easy way to verify if main loop is acutally waiting on condition
479  }
480  }
481  DEBTRACE("---");
482  //if (doDump) saveState(_dumpErrorFile);
483  } // --- End of critical section
484  return ret;
485 }

References _condForStepByStep, _executorState, _isRunningunderExternalControl, _mutexForSchedulerUpdate, DEBTRACE, YACS::FINISHED, YACS::PAUSED, YACS::RUNNING, sendEvent(), YACS::STOPPED, and YACS::WAITINGTASKS.

Referenced by main(), and stopExecution().

◆ RunA()

void Executor::RunA ( Scheduler graph,
int  debug = 0,
bool  fromScratch = true 
)

Execute a graph waiting for completion.

Parameters
graph: schema to execute
debug: display the graph with dot if debug == 1
fromScratch: if true the graph is reinitialized

Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute

Calls Executor::launchTask to execute a selected Task.

Completion when graph is finished (Scheduler::isFinished)

Definition at line 116 of file Executor.cxx.

117 {
118  DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
119  _mainSched=graph;
120  _root = dynamic_cast<ComposedNode *>(_mainSched);
121  if (!_root) throw Exception("Executor::Run, Internal Error!");
122  bool isMore;
123  int i=0;
124  if(debug>1)_displayDot(graph);
125  if (fromScratch)
126  {
127  graph->init();
128  graph->exUpdateState();
129  }
130  if(debug>1)_displayDot(graph);
131  vector<Task *> tasks;
132  vector<Task *>::iterator iter;
133  _toContinue=true;
137  _runningTasks.clear();
139  while(_toContinue)
140  {
142 
143  if(debug>2)_displayDot(graph);
144 
145  {//Critical section
147  tasks=graph->getNextTasks(isMore);
148  graph->selectRunnableTasks(tasks);
149  }//End of critical section
150 
151  if(debug>2)_displayDot(graph);
152 
153  for(iter=tasks.begin();iter!=tasks.end();iter++)
154  loadTask(*iter,this);
155 
156  if(debug>1)_displayDot(graph);
157 
158  launchTasks(tasks);
159 
160  if(debug>1)_displayDot(graph);
161 
162  {//Critical section
164  _toContinue=!graph->isFinished();
165  }//End of critical section
166  DEBTRACE("_toContinue: " << _toContinue);
167 
168  if(debug>0)_displayDot(graph);
169 
170  i++;
171  }
172 }
void launchTasks(const std::vector< Task * > &tasks)
Execute a list of tasks possibly connected through datastream links.
Definition: Executor.cxx:921
void sleepWhileNoEventsFromAnyRunningTask()
wait until a running task ends
Definition: Executor.cxx:1082
virtual std::vector< Task * > getNextTasks(bool &isMore)=0
virtual bool isFinished()=0
virtual void init(bool start=true)=0
virtual void selectRunnableTasks(std::vector< Task * > &tasks)=0
virtual void exUpdateState()=0

References _displayDot(), _execMode, _isWaitingEventsFromRunningTasks, _mainSched, _mutexForSchedulerUpdate, _numberOfEndedTasks, _numberOfRunningTasks, _root, _runningTasks, _toContinue, YACS::CONTINUE, DEBTRACE, YACS::ENGINE::Scheduler::exUpdateState(), YACS::ENGINE::Scheduler::getNextTasks(), yacsorb.CORBAEngineTest::i, YACS::ENGINE::Scheduler::init(), YACS::ENGINE::Scheduler::isFinished(), launchTasks(), loadTask(), YACS::ENGINE::Scheduler::selectRunnableTasks(), and sleepWhileNoEventsFromAnyRunningTask().

◆ RunB()

void Executor::RunB ( Scheduler graph,
int  debug = 0,
bool  fromScratch = true 
)

Execute a graph with breakpoints or step by step.

To be launch in a thread (main thread controls the progression).

Parameters
graph: schema to execute
debug: display the graph with dot if debug >0
fromScratch: if false, state from a previous partial exection is already loaded

Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute

Calls Executor::checkBreakPoints to verify if a pause is requested

Calls Executor::launchTask to execute a selected Task

Completion when graph is finished (Scheduler::isFinished)

States of execution:

Modes of Execution:

A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready. Step by Step means execution node by node or group of node by group of nodes. At a given step, the user decides to launch all the ready nodes or only a subset (Caution: some nodes must run in parallel). The next event (end of task) may give a new set of ready nodes, and define a new step.

The graph execution may be controled by a pilot which sends requests. Requests are asynchronous. Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.

If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:

TO BE VALIDATED:

  • Pilot may connect to executor during execution, or deconnect.
  • Several Pilots may be connected at the same time (for observation...)

Definition at line 235 of file Executor.cxx.

236 {
237  DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
238 
239  { // --- Critical section
241  _mainSched = graph;
242  _root = dynamic_cast<ComposedNode *>(_mainSched);
243  if (!_root) throw Exception("Executor::Run, Internal Error!");
245  sendEvent("executor");
246  _toContinue=true;
247  _isOKToEnd = false;
248  _errorDetected = false;
251  _runningTasks.clear();
253  string tracefile = "traceExec_";
254  tracefile += _mainSched->getName();
255  _trace.open(tracefile.c_str());
256  _start = std::chrono::steady_clock::now();
257  } // --- End of critical section
258 
259  if (debug > 1) _displayDot(graph);
260 
261  if (fromScratch)
262  {
263  try
264  {
265  graph->init();
266  graph->exUpdateState();
267  }
268  catch(Exception& ex)
269  {
270  DEBTRACE("exception: "<< (ex.what()));
272  sendEvent("executor");
273  throw;
274  }
275  }
277  sendEvent("executor");
278 
279  if (debug > 1) _displayDot(graph);
280 
281  vector<Task *>::iterator iter;
282  bool isMore;
283  int problemCount=0;
284  int numberAllTasks;
285 
287  sendEvent("executor");
288  while (_toContinue)
289  {
290  DEBTRACE("--- executor main loop");
292  DEBTRACE("--- events...");
293  if (debug > 2) _displayDot(graph);
294  { // --- Critical section
296  std::vector<Task *> tasks = graph->getNextTasks(isMore);
297  graph->selectRunnableTasks(tasks);
299  _tasks = tasks;
300  numberAllTasks=_numberOfRunningTasks+_tasks.size();
301  } // --- End of critical section
302  if (debug > 2) _displayDot(graph);
304  {
305  if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
306  if (debug > 0) _displayDot(graph);
307  DEBTRACE("---");
309  if (debug > 1) _displayDot(graph);
310  DEBTRACE("---");
312  DEBTRACE("---");
313  }
314  if (debug > 1) _displayDot(graph);
315  { // --- Critical section
316  DEBTRACE("---");
318  //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
319  if(_numberOfRunningTasks == 0)
320  _toContinue = !graph->isFinished();
321 
322  DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
323  DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
324  DEBTRACE("_toContinue: " << _toContinue);
325  if(_toContinue && numberAllTasks==0)
326  {
327  //Problem : no running tasks and no task to launch ??
328  problemCount++;
329  std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
330  //Pause to give a chance to interrupt
331  usleep(1000);
332  if(problemCount > 25)
333  {
334  // Too much problems encountered : stop execution
335  _toContinue=false;
336  }
337  }
338 
339  if (! _toContinue)
340  {
342  sendEvent("executor");
343  _condForPilot.notify_all();
344  }
345  } // --- End of critical section
346  if (debug > 0) _displayDot(graph);
347  DEBTRACE("_toContinue: " << _toContinue);
348  }
349 
350  DEBTRACE("End of main Loop");
351 
352  { // --- Critical section
354  if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
355  {
356  DEBTRACE("stop requested: End soon");
358  _toContinue = false;
359  sendEvent("executor");
360  }
361  } // --- End of critical section
363  {
365  }
366  {
368  _trace.close();
369  }
370  DEBTRACE("End of RunB thread");
371 }
YACS::BASES::Mutex _mutexForTrace
Definition: Executor.hxx:74
bool saveState(const std::string &xmlFile)
save the current state of execution in an xml file
Definition: Executor.cxx:678
void loadParallelTasks(const std::vector< Task * > &tasks, const Executor *execInst)
Definition: Executor.cxx:899
bool checkBreakPoints()
Wait reactivation in modes Step By step or with BreakPoints.
Definition: Executor.cxx:752
std::string _dumpErrorFile
Definition: Executor.hxx:94
std::ofstream _trace
Definition: Executor.hxx:93
void filterTasksConsideringContainers(std::vector< Task * > &tsks)
Definition: Executor.cxx:1352
std::chrono::steady_clock::time_point _start
Definition: Executor.hxx:98
virtual std::string getName() const =0

References _condForPilot, _displayDot(), _dumpErrorFile, _dumpOnErrorRequested, _errorDetected, _executorState, _isOKToEnd, _isWaitingEventsFromRunningTasks, _mainSched, _mutexForSchedulerUpdate, _mutexForTrace, _numberOfEndedTasks, _numberOfRunningTasks, _root, _runningTasks, _start, _tasks, _toContinue, _trace, checkBreakPoints(), DEBTRACE, testCppPluginInvokation::ex, YACS::ENGINE::Scheduler::exUpdateState(), filterTasksConsideringContainers(), YACS::FINISHED, YACS::ENGINE::Scheduler::getName(), YACS::ENGINE::Scheduler::getNextTasks(), YACS::ENGINE::Scheduler::init(), YACS::INITIALISED, YACS::ENGINE::Scheduler::isFinished(), launchTasks(), loadParallelTasks(), YACS::NOTYETINITIALIZED, YACS::RUNNING, saveState(), YACS::ENGINE::Scheduler::selectRunnableTasks(), sendEvent(), sleepWhileNoEventsFromAnyRunningTask(), and YACS::STOPPED.

Referenced by executorFunc(), and RunW().

◆ runTask()

YACS::Event Executor::runTask ( Task task)

Definition at line 1535 of file Executor.cxx.

1536 {
1537  { // --- Critical section
1539  task->begin(); //change state to ACTIVATED
1540  }
1541  traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1542 
1543  if(getDPLScopeSensitive())
1544  {
1545  Node *node(dynamic_cast<Node *>(task));
1546  ComposedNode *gfn(dynamic_cast<ComposedNode *>(_mainSched));
1547  if(node!=0 && gfn!=0)
1548  node->applyDPLScope(gfn);
1549  }
1550 
1552  try
1553  {
1554  traceExec(task, "start execution",ComputePlacement(task));
1555  task->execute();
1556  traceExec(task, "end execution OK",ComputePlacement(task));
1557  }
1558  catch(Exception& ex)
1559  {
1560  std::cerr << "YACS Exception during execute" << std::endl;
1561  std::cerr << ex.what() << std::endl;
1562  ev=YACS::ABORT;
1563  string message = "end execution ABORT, ";
1564  message += ex.what();
1565  traceExec(task, message,ComputePlacement(task));
1566  }
1567  catch(...)
1568  {
1569  // Execution has failed
1570  std::cerr << "Execution has failed: unknown reason" << std::endl;
1571  ev=YACS::ABORT;
1572  traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1573  }
1574 
1575  // Disconnect task
1576  try
1577  {
1578  DEBTRACE("task->disconnectService()");
1579  task->disconnectService();
1580  traceExec(task, "disconnectService",ComputePlacement(task));
1581  }
1582  catch(...)
1583  {
1584  // Disconnect has failed
1585  std::cerr << "disconnect has failed" << std::endl;
1586  ev=YACS::ABORT;
1587  traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1588  }
1589  //
1590 
1591  std::string placement(ComputePlacement(task));
1592 
1593  // container management for HomogeneousPoolOfContainer
1594 
1595  HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1596  if(contC)
1597  {
1598  std::lock_guard<std::mutex> alckCont(contC->getLocker());
1599  contC->release(task);
1600  }
1601 
1602  return ev;
1603 }

References _mainSched, _mutexForSchedulerUpdate, YACS::ABORT, YACS::ENGINE::Node::applyDPLScope(), YACS::ENGINE::Task::begin(), ComputePlacement(), DEBTRACE, YACS::ENGINE::Task::disconnectService(), testCppPluginInvokation::ex, YACS::ENGINE::Task::execute(), YACS::FINISH, YACS::ENGINE::Task::getContainer(), getDPLScopeSensitive(), YACS::ENGINE::HomogeneousPoolContainer::getLocker(), YACS::ENGINE::Task::getState(), YACS::ENGINE::Node::getStateName(), YACS::ENGINE::HomogeneousPoolContainer::release(), and traceExec().

Referenced by YACS::ENGINE::WlmTask::run().

◆ RunW()

void Executor::RunW ( Scheduler graph,
int  debug = 0,
bool  fromScratch = true 
)

Definition at line 1833 of file Executor.cxx.

1834 {
1835  std::string str_value = graph->getProperty("executor");
1836  if(str_value == "WorkloadManager"
1837  || str_value == "WORKLOADMANAGER"
1838  || str_value == "workloadmanager"
1839  || str_value == "WorkLoadManager")
1840  runWlm(graph, debug, fromScratch);
1841  else
1842  RunB(graph, debug, fromScratch);
1843 }
void RunB(Scheduler *graph, int debug=0, bool fromScratch=true)
Execute a graph with breakpoints or step by step.
Definition: Executor.cxx:235
void runWlm(Scheduler *graph, int debug=0, bool fromScratch=true)
Definition: Executor.cxx:1688
virtual std::string getProperty(const std::string &name)=0

References YACS::ENGINE::Scheduler::getProperty(), RunB(), and runWlm().

Referenced by driverTest(), and main().

◆ runWlm()

void Executor::runWlm ( Scheduler graph,
int  debug = 0,
bool  fromScratch = true 
)

Definition at line 1688 of file Executor.cxx.

1689 {
1690  DEBTRACE("Executor::runWlm debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
1691  { // --- Critical section
1693  _mainSched = graph;
1694  _root = dynamic_cast<ComposedNode *>(_mainSched);
1695  if (!_root) throw Exception("Executor::Run, Internal Error!");
1697  sendEvent("executor");
1698  _toContinue=true;
1699  _isOKToEnd = false;
1700  _errorDetected = false;
1703  _runningTasks.clear();
1704  _numberOfEndedTasks = 0;
1705  string tracefile = "traceExec_";
1706  tracefile += _mainSched->getName();
1707  _trace.open(tracefile.c_str());
1708  _start = std::chrono::steady_clock::now();
1709  } // --- End of critical section
1710 
1711  if (debug > 1) _displayDot(graph);
1712 
1713  if (fromScratch)
1714  {
1715  try
1716  {
1717  graph->init();
1718  graph->exUpdateState();
1719  }
1720  catch(Exception& ex)
1721  {
1722  DEBTRACE("exception: "<< (ex.what()));
1724  sendEvent("executor");
1725  throw;
1726  }
1727  }
1729  sendEvent("executor");
1730 
1731  if (debug > 1) _displayDot(graph);
1732 
1733  bool isMore;
1734  int problemCount=0;
1735  int numberAllTasks;
1736 
1738  sendEvent("executor");
1739 
1740  WorkloadManager::DefaultAlgorithm algo;
1741  WorkloadManager::WorkloadManager wlm(algo);
1743  wlm.start();
1744 
1745  while (_toContinue)
1746  {
1747  DEBTRACE("--- executor main loop");
1749  DEBTRACE("--- events...");
1750  if (debug > 2) _displayDot(graph);
1751  { // --- Critical section
1753  std::vector<Task *> readyTasks=graph->getNextTasks(isMore);
1754  graph->selectRunnableTasks(readyTasks);
1755  _tasks.clear();
1756  for(Task * t : readyTasks)
1757  if(_runningTasks.find(t) == _runningTasks.end())
1758  _tasks.push_back(t);
1759  // TODO: to be removed
1761  numberAllTasks=_numberOfRunningTasks+_tasks.size();
1762  } // --- End of critical section
1763  if (debug > 2) _displayDot(graph);
1764  DEBTRACE("--- events...");
1766  {
1767  if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
1768  for(Task * task : _tasks)
1769  {
1770  beginTask(task);
1771  WlmTask* newTask = new WlmTask(*this, task);
1772  wlm.addTask(newTask);
1773  }
1774  }
1775  if (debug > 1) _displayDot(graph);
1776  { // --- Critical section
1777  DEBTRACE("---");
1779  //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
1780  _toContinue = !graph->isFinished();
1781 
1782  DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
1783  DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
1784  DEBTRACE("_toContinue: " << _toContinue);
1785  if(_toContinue && numberAllTasks==0)
1786  {
1787  //Problem : no running tasks and no task to launch ??
1788  problemCount++;
1789  std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
1790  //Pause to give a chance to interrupt
1791  usleep(1000);
1792  if(problemCount > 25)
1793  {
1794  // Too much problems encountered : stop execution
1795  _toContinue=false;
1796  }
1797  }
1798 
1799  if (! _toContinue)
1800  {
1802  sendEvent("executor");
1803  _condForPilot.notify_all();
1804  }
1805  } // --- End of critical section
1806  if (debug > 0) _displayDot(graph);
1807  DEBTRACE("_toContinue: " << _toContinue);
1808  }
1809 
1810  wlm.stop();
1811  DEBTRACE("End of main Loop");
1812 
1813  { // --- Critical section
1815  if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
1816  {
1817  DEBTRACE("stop requested: End soon");
1819  _toContinue = false;
1820  sendEvent("executor");
1821  }
1822  } // --- End of critical section
1824  {
1826  }
1827  {
1829  _trace.close();
1830  }
1831 }
void beginTask(Task *task)
Definition: Executor.cxx:1464
static void loadResources(WorkloadManager::WorkloadManager &wm)
Definition: WlmTask.cxx:84

References _condForPilot, _displayDot(), _dumpErrorFile, _dumpOnErrorRequested, _errorDetected, _executorState, _isOKToEnd, _isWaitingEventsFromRunningTasks, _mainSched, _mutexForSchedulerUpdate, _mutexForTrace, _numberOfEndedTasks, _numberOfRunningTasks, _root, _runningTasks, _start, _tasks, _toContinue, _trace, beginTask(), checkBreakPoints(), DEBTRACE, testCppPluginInvokation::ex, YACS::ENGINE::Scheduler::exUpdateState(), filterTasksConsideringContainers(), YACS::FINISHED, YACS::ENGINE::Scheduler::getName(), YACS::ENGINE::Scheduler::getNextTasks(), YACS::ENGINE::Scheduler::init(), YACS::INITIALISED, YACS::ENGINE::Scheduler::isFinished(), YACS::ENGINE::WlmTask::loadResources(), YACS::NOTYETINITIALIZED, YACS::RUNNING, saveState(), YACS::ENGINE::Scheduler::selectRunnableTasks(), sendEvent(), sleepWhileNoEventsFromAnyRunningTask(), YACS::STOPPED, and gui.Appli::t.

Referenced by RunW().

◆ saveState()

bool Executor::saveState ( const std::string &  xmlFile)

save the current state of execution in an xml file

Definition at line 678 of file Executor.cxx.

679 {
680  DEBTRACE("Executor::saveState() in " << xmlFile);
681  bool result = false;
682  try {
685  vst.openFileDump(xmlFile.c_str());
686  _root->accept(&vst);
687  vst.closeFileDump();
688  result = true;
689  }
690  catch(Exception& ex) {
691  std::cerr << ex.what() << std::endl;
692  }
693  return result;
694 }
void accept(Visitor *visitor)

References _mutexForSchedulerUpdate, _root, YACS::ENGINE::ComposedNode::accept(), YACS::ENGINE::VisitorSaveState::closeFileDump(), DEBTRACE, testCppPluginInvokation::ex, and YACS::ENGINE::VisitorSaveState::openFileDump().

Referenced by RunB(), and runWlm().

◆ sendEvent()

void Executor::sendEvent ( const std::string &  event)
protectedvirtual

emit notification to all observers registered with the dispatcher

The dispatcher is unique and can be obtained by getDispatcher()

Definition at line 1324 of file Executor.cxx.

1325 {
1327  YASSERT(disp);
1328  YASSERT(_root);
1329  disp->dispatch(_root,event);
1330 }
#define YASSERT(val)
YASSERT macro is always defined, used like assert, but throw a YACS::Exception instead of abort.
Definition: YacsTrace.hxx:59
Base class for dispatcher in observer pattern.
Definition: Dispatcher.hxx:74
virtual void dispatch(Node *object, const std::string &event)
Definition: Dispatcher.cxx:85
static Dispatcher * getDispatcher()
Definition: Dispatcher.cxx:55

References _root, YACS::ENGINE::Dispatcher::dispatch(), YACS::ENGINE::Dispatcher::getDispatcher(), and YASSERT.

Referenced by checkBreakPoints(), endTask(), functionForTaskExecution(), resumeCurrentBreakPoint(), RunB(), and runWlm().

◆ setDPLScopeSensitive()

void YACS::ENGINE::Executor::setDPLScopeSensitive ( bool  newVal)
inline

Definition at line 108 of file Executor.hxx.

108 { _DPLScopeSensitive=newVal; }

◆ setExecMode()

void Executor::setExecMode ( YACS::ExecutionMode  mode)

Dynamically set the current mode of execution.

The mode can be Continue, step by step, or stop before execution of a node defined in a list of breakpoints.

Definition at line 430 of file Executor.cxx.

431 {
432  DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
433  { // --- Critical section
436  _execMode = mode;
437  } // --- End of critical section
438 }

References _execMode, _isRunningunderExternalControl, _mutexForSchedulerUpdate, and DEBTRACE.

Referenced by main(), and stopExecution().

◆ setKeepGoingProperty()

void YACS::ENGINE::Executor::setKeepGoingProperty ( bool  newVal)
inline

Definition at line 106 of file Executor.hxx.

106 { _keepGoingOnFail=newVal; }

◆ setListOfBreakPoints()

void Executor::setListOfBreakPoints ( std::list< std::string >  listOfBreakPoints)

define a list of nodes names as breakpoints in the graph

Definition at line 491 of file Executor.cxx.

492 {
493  DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
494  { // --- Critical section
497  _listOfBreakPoints = listOfBreakPoints;
498  } // --- End of critical section
499 }

References _isRunningunderExternalControl, _listOfBreakPoints, _mutexForSchedulerUpdate, and DEBTRACE.

◆ setMaxNbOfThreads()

void Executor::setMaxNbOfThreads ( int  maxNbThreads)

Definition at line 1116 of file Executor.cxx.

1117 {
1118  _maxNbThreads = static_cast< std::uint32_t >(maxNbThreads);
1119 }

References _maxNbThreads.

◆ setStepsToExecute()

bool Executor::setStepsToExecute ( std::list< std::string >  listToExecute)

Define a subset of task to execute in step by step mode.

Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad in the current step. If some nodes must run in parallel, they must stay together in the list.

Definition at line 545 of file Executor.cxx.

546 {
547  DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
548  bool ret = true;
549  vector<Task *>::iterator iter;
550  vector<Task *> restrictedTasks;
551  { // --- Critical section
554  switch (_executorState)
555  {
556  case YACS::WAITINGTASKS:
557  case YACS::PAUSED:
558  {
559  for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
560  {
561  string readyNode = _mainSched->getTaskName(*iter);
562  if (find(listToExecute.begin(), listToExecute.end(), readyNode)
563  != listToExecute.end())
564  {
565  restrictedTasks.push_back(*iter);
566  DEBTRACE("node to execute " << readyNode);
567  }
568  }
569  _tasks.clear();
570  for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
571  {
572  _tasks.push_back(*iter);
573  }
574  break;
575  }
577  case YACS::INITIALISED:
578  case YACS::RUNNING:
579  case YACS::FINISHED:
580  case YACS::STOPPED:
581  default:
582  {
583  break;
584  }
585  }
586  } // --- End of critical section
587 
588  _tasks.clear();
589  for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
590  {
591  _tasks.push_back(*iter);
592  }
593  for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
594  {
595  string readyNode = _mainSched->getTaskName(*iter);
596  DEBTRACE("selected node to execute " << readyNode);
597  }
598 
599  return ret;
600 }

References _executorState, _isRunningunderExternalControl, _mainSched, _mutexForSchedulerUpdate, _tasks, _tasksSave, DEBTRACE, YACS::FINISHED, YACS::ENGINE::Scheduler::getTaskName(), YACS::INITIALISED, YACS::NOTYETINITIALIZED, YACS::PAUSED, YACS::RUNNING, YACS::STOPPED, and YACS::WAITINGTASKS.

◆ setStopOnError()

void Executor::setStopOnError ( bool  dumpRequested = false,
std::string  xmlFile = "" 
)

ask to stop execution on the first node found in error

Parameters
dumpRequestedproduce a state dump when an error is found
xmlFilename of file used for state dump

Definition at line 399 of file Executor.cxx.

400 {
401  { // --- Critical section
403  _dumpErrorFile=xmlFile;
405  _dumpOnErrorRequested = dumpRequested;
406  if (dumpRequested && xmlFile.empty())
407  throw YACS::Exception("dump on error requested and no filename given for dump");
408  DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
409  } // --- End of critical section
410 }

References _dumpErrorFile, _dumpOnErrorRequested, _mutexForSchedulerUpdate, _stopOnErrorRequested, and DEBTRACE.

Referenced by main().

◆ sleepWhileNoEventsFromAnyRunningTask()

void Executor::sleepWhileNoEventsFromAnyRunningTask ( )
protected

wait until a running task ends

Definition at line 1082 of file Executor.cxx.

1083 {
1084  DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1085 // _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1088  {
1090  _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1091  }
1093  DEBTRACE("---");
1094 }
YACS::BASES::Condition _condForNewTasksToPerform
Definition: Executor.hxx:69

References _condForNewTasksToPerform, _isWaitingEventsFromRunningTasks, _mutexForSchedulerUpdate, _numberOfEndedTasks, _numberOfRunningTasks, and DEBTRACE.

Referenced by RunA(), RunB(), and runWlm().

◆ stopExecution()

void Executor::stopExecution ( )

stops the execution as soon as possible

Definition at line 668 of file Executor.cxx.

669 {
671  //waitPause();
672  _isOKToEnd = true;
674 }
void setExecMode(YACS::ExecutionMode mode)
Dynamically set the current mode of execution.
Definition: Executor.cxx:430
bool resumeCurrentBreakPoint()
wake up executor when in pause
Definition: Executor.cxx:448

References _isOKToEnd, resumeCurrentBreakPoint(), setExecMode(), and YACS::STEPBYSTEP.

◆ suspendASAP()

bool Executor::suspendASAP ( )

This method can be called at any time simultaneously during a RunB call. This method will wait until the executor is locked in a consistent state of a running graph.

This method is expected to be called in association with resume method. The returned parameter is expected to be transfered to resume method.

Definition at line 643 of file Executor.cxx.

644 {
645  // no AutoLocker here. It's not a bug.
648  {// execution is finished
649  _mutexForSchedulerUpdate.unLock();
650  return false;// the executor is no more running
651  }
652  //general case. Leave method with locker in locked status
653  return true;
654 }

References _executorState, _mutexForSchedulerUpdate, _toContinue, and YACS::FINISHED.

◆ traceExec()

void Executor::traceExec ( Task task,
const std::string &  message,
const std::string &  placement 
)
protected

Definition at line 1301 of file Executor.cxx.

1302 {
1303  string nodeName = _mainSched->getTaskName(task);
1304  Container *cont = task->getContainer();
1305  string containerName = "---";
1306  if (cont)
1307  containerName = cont->getName();
1308 
1309  std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
1310  std::chrono::milliseconds millisec;
1311  millisec = std::chrono::duration_cast<std::chrono::milliseconds>(now -_start);
1312  double elapse = double(millisec.count()) / 1000.0;
1313  {
1315  _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1316  _trace << flush;
1317  }
1318 }
virtual std::string getName() const
Definition: Container.hxx:81

References _mainSched, _mutexForTrace, _start, _trace, YACS::ENGINE::Task::getContainer(), YACS::ENGINE::Container::getName(), YACS::ENGINE::Scheduler::getTaskName(), and threadargs::task.

Referenced by functionForTaskExecution(), launchTask(), launchTasks(), loadTask(), makeDatastreamConnections(), and runTask().

◆ unsetStopOnError()

void Executor::unsetStopOnError ( )

ask to do not stop execution on nodes found in error

Definition at line 416 of file Executor.cxx.

417 {
418  { // --- Critical section
420  _stopOnErrorRequested=false;
421  } // --- End of critical section
422 }

References _mutexForSchedulerUpdate, and _stopOnErrorRequested.

◆ waitPause()

void Executor::waitPause ( )

suspend pilot execution until Executor is in pause or waiting tasks completion mode.

Do nothing if execution is finished or in pause. Wait first step if Executor is running or in initialization.

Definition at line 608 of file Executor.cxx.

609 {
610  DEBTRACE("Executor::waitPause()" << _executorState);
611  { // --- Critical section
614  switch (_executorState)
615  {
616  default:
617  case YACS::STOPPED:
618  case YACS::FINISHED:
619  case YACS::WAITINGTASKS:
620  case YACS::PAUSED:
621  {
622  break;
623  }
625  case YACS::INITIALISED:
626  case YACS::RUNNING:
627  {
628  _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
629  break;
630  }
631  }
632  } // --- End of critical section
633  DEBTRACE("---");
634 }

References _condForPilot, _executorState, _isRunningunderExternalControl, _mutexForSchedulerUpdate, DEBTRACE, YACS::FINISHED, YACS::INITIALISED, YACS::NOTYETINITIALIZED, YACS::PAUSED, YACS::RUNNING, YACS::STOPPED, and YACS::WAITINGTASKS.

◆ waitResume()

void Executor::waitResume ( )
protected

in modes Step By step or with BreakPoint, wait until pilot resumes the execution

With the condition Mutex, the mutex is released atomically during the wait. Pilot calls Executor::resumeCurrentBreakPoint to resume execution. Must be called while mutex is locked.

Definition at line 840 of file Executor.cxx.

841 {
842  DEBTRACE("Executor::waitResume()");
843  _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
844  DEBTRACE("---");
845 }

References _condForStepByStep, _mutexForSchedulerUpdate, and DEBTRACE.

Referenced by checkBreakPoints().

◆ wakeUp()

void Executor::wakeUp ( )
protected

must be used protected by _mutexForSchedulerUpdate!

Definition at line 1099 of file Executor.cxx.

1100 {
1101  DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1103  {
1105  _condForNewTasksToPerform.notify_all();
1106  }
1107  else
1109 }

References _condForNewTasksToPerform, _isWaitingEventsFromRunningTasks, _numberOfEndedTasks, and DEBTRACE.

Referenced by endTask(), and functionForTaskExecution().

Member Data Documentation

◆ _condForNewTasksToPerform

YACS::BASES::Condition YACS::ENGINE::Executor::_condForNewTasksToPerform
protected

Definition at line 69 of file Executor.hxx.

Referenced by sleepWhileNoEventsFromAnyRunningTask(), and wakeUp().

◆ _condForPilot

YACS::BASES::Condition YACS::ENGINE::Executor::_condForPilot
protected

◆ _condForStepByStep

YACS::BASES::Condition YACS::ENGINE::Executor::_condForStepByStep
protected

◆ _DPLScopeSensitive

bool YACS::ENGINE::Executor::_DPLScopeSensitive
protected

specifies if scope DynParaLoop is active or not. False by default.

Definition at line 97 of file Executor.hxx.

◆ _dumpErrorFile

std::string YACS::ENGINE::Executor::_dumpErrorFile
protected

Definition at line 94 of file Executor.hxx.

Referenced by RunB(), runWlm(), and setStopOnError().

◆ _dumpOnErrorRequested

bool YACS::ENGINE::Executor::_dumpOnErrorRequested
protected

Definition at line 79 of file Executor.hxx.

Referenced by Executor(), RunB(), runWlm(), and setStopOnError().

◆ _errorDetected

bool YACS::ENGINE::Executor::_errorDetected
protected

Definition at line 80 of file Executor.hxx.

Referenced by endTask(), Executor(), functionForTaskExecution(), RunB(), and runWlm().

◆ _execMode

YACS::ExecutionMode YACS::ENGINE::Executor::_execMode
protected

◆ _executorState

◆ _isOKToEnd

bool YACS::ENGINE::Executor::_isOKToEnd
protected

◆ _isRunningunderExternalControl

◆ _isWaitingEventsFromRunningTasks

bool YACS::ENGINE::Executor::_isWaitingEventsFromRunningTasks
protected

Definition at line 82 of file Executor.hxx.

Referenced by RunA(), RunB(), runWlm(), sleepWhileNoEventsFromAnyRunningTask(), and wakeUp().

◆ _keepGoingOnFail

bool YACS::ENGINE::Executor::_keepGoingOnFail
protected

Definition at line 95 of file Executor.hxx.

◆ _listOfBreakPoints

std::list<std::string> YACS::ENGINE::Executor::_listOfBreakPoints
protected

Definition at line 89 of file Executor.hxx.

Referenced by checkBreakPoints(), and setListOfBreakPoints().

◆ _listOfTasksToLoad

std::list<std::string> YACS::ENGINE::Executor::_listOfTasksToLoad
protected

Definition at line 90 of file Executor.hxx.

Referenced by checkBreakPoints(), and getTasksToLoad().

◆ _mainSched

◆ _maxNbThreads

std::uint32_t YACS::ENGINE::Executor::_maxNbThreads = 10000
protected

◆ _maxThreads

int Executor::_maxThreads
static

Definition at line 131 of file Executor.hxx.

Referenced by Executor(), launchTask(), and YACS::ENGINE::Runtime::Runtime().

◆ _mutexForNbOfConcurrentThreads

YACS::BASES::Mutex YACS::ENGINE::Executor::_mutexForNbOfConcurrentThreads
protected

Definition at line 68 of file Executor.hxx.

Referenced by getNbOfThreads().

◆ _mutexForSchedulerUpdate

◆ _mutexForTrace

YACS::BASES::Mutex YACS::ENGINE::Executor::_mutexForTrace
protected

Definition at line 74 of file Executor.hxx.

Referenced by RunB(), runWlm(), and traceExec().

◆ _nbOfConcurrentThreads

int YACS::ENGINE::Executor::_nbOfConcurrentThreads
protected

Definition at line 67 of file Executor.hxx.

◆ _numberOfEndedTasks

int YACS::ENGINE::Executor::_numberOfEndedTasks
protected

Definition at line 85 of file Executor.hxx.

Referenced by Executor(), RunA(), RunB(), runWlm(), sleepWhileNoEventsFromAnyRunningTask(), and wakeUp().

◆ _numberOfRunningTasks

int YACS::ENGINE::Executor::_numberOfRunningTasks
protected

◆ _root

ComposedNode* YACS::ENGINE::Executor::_root
protected

Definition at line 66 of file Executor.hxx.

Referenced by Executor(), RunA(), RunB(), runWlm(), saveState(), and sendEvent().

◆ _runningTasks

std::set<Task *> YACS::ENGINE::Executor::_runningTasks
protected

◆ _semForMaxThreads

YACS::BASES::Semaphore YACS::ENGINE::Executor::_semForMaxThreads
protected

Definition at line 70 of file Executor.hxx.

Referenced by functionForTaskExecution(), and launchTask().

◆ _semThreadCnt

int YACS::ENGINE::Executor::_semThreadCnt
protected

Definition at line 86 of file Executor.hxx.

Referenced by Executor(), functionForTaskExecution(), and launchTask().

◆ _start

std::chrono::steady_clock::time_point YACS::ENGINE::Executor::_start
protected

Definition at line 98 of file Executor.hxx.

Referenced by RunB(), runWlm(), and traceExec().

◆ _stopOnErrorRequested

bool YACS::ENGINE::Executor::_stopOnErrorRequested
protected

◆ _tasks

std::vector<Task *> YACS::ENGINE::Executor::_tasks
protected

Definition at line 91 of file Executor.hxx.

Referenced by checkBreakPoints(), loadParallelTasks(), RunB(), runWlm(), and setStepsToExecute().

◆ _tasksSave

std::vector<Task *> YACS::ENGINE::Executor::_tasksSave
protected

Definition at line 92 of file Executor.hxx.

Referenced by checkBreakPoints(), and setStepsToExecute().

◆ _threadStackSize

size_t Executor::_threadStackSize
static

Definition at line 132 of file Executor.hxx.

Referenced by launchTask(), loadParallelTasks(), and YACS::ENGINE::Runtime::Runtime().

◆ _toContinue

bool YACS::ENGINE::Executor::_toContinue
protected

Definition at line 76 of file Executor.hxx.

Referenced by Executor(), isNotFinished(), RunA(), RunB(), runWlm(), and suspendASAP().

◆ _trace

std::ofstream YACS::ENGINE::Executor::_trace
protected

Definition at line 93 of file Executor.hxx.

Referenced by RunB(), runWlm(), and traceExec().


The documentation for this class was generated from the following files: