Version: 9.15.0
ForEachLoop.cxx
Go to the documentation of this file.
1 // Copyright (C) 2006-2025 CEA, EDF
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19 
20 #include "ForEachLoop.hxx"
21 #include "TypeCode.hxx"
22 #include "Visitor.hxx"
23 #include "ComposedNode.hxx"
24 #include "Executor.hxx"
25 #include "AutoLocker.hxx"
26 
27 #include <iostream>
28 #include <iomanip>
29 #include <sstream>
30 #include <algorithm> // std::replace_if
31 #include <functional> // std::bind
32 
33 //#define _DEVDEBUG_
34 #include "YacsTrace.hxx"
35 
36 #ifdef WIN32
37 #include <functional>
38 #endif
39 
40 using namespace YACS::ENGINE;
41 using namespace std;
42 
49 const char FakeNodeForForEachLoop::NAME[]="thisIsAFakeNode";
50 
51 const char SplitterNode::NAME_OF_SEQUENCE_INPUT[]="SmplsCollection";
52 
53 const char ForEachLoopGen::NAME_OF_SPLITTERNODE[]="splitter";
54 
56 
57 const char ForEachLoopGen::INTERCEPTOR_STR[]="_interceptor";
58 
59 InterceptorInputPort::InterceptorInputPort(const std::string& name, Node *node, TypeCode* type):AnyInputPort(name,node,type),
60  DataPort(name,node,type),Port(node),
61  _repr(0)
62 {
63 }
64 
65 InterceptorInputPort::InterceptorInputPort(const InterceptorInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),
66  Port(other,newHelder),
67  _repr(0)
68 {
69 }
70 
71 void InterceptorInputPort::getAllRepresentants(std::set<InPort *>& repr) const
72 {
73  set<InPort *> ports=_repr->edSetInPort();
74  for(set<InPort *>::iterator iter=ports.begin();iter!=ports.end();iter++)
75  (*iter)->getAllRepresentants(repr);
76 }
77 
79 {
80  return new InterceptorInputPort(*this,newHelder);
81 }
82 
84 {
85  _repr=repr;
86 }
87 
89 {
90  return (--_cnt==0);
91 }
92 
94 {
95  _cnt++;
96 }
97 
98 AnySplitOutputPort::AnySplitOutputPort(const std::string& name, Node *node, TypeCode *type):OutputPort(name,node,type),
99  DataPort(name,node,type),Port(node),
100  _repr(0),_intercptr(0),_cnt(1)
101 {
102 }
103 
105  DataPort(other,newHelder),
106  Port(other,newHelder),
107  _repr(0),_intercptr(0),_cnt(1)
108 {
109 }
110 
112 {
113  bool ret=OutputPort::addInPort(inPort);
114  if(_repr)
116  return ret;
117 }
118 
119 void AnySplitOutputPort::getAllRepresented(std::set<OutPort *>& represented) const
120 {
121  if(!_repr)
122  OutPort::getAllRepresented(represented);
123  else
124  _repr->getAllRepresented(represented);
125 }
126 
127 int AnySplitOutputPort::removeInPort(InPort *inPort, bool forward)
128 {
129  bool ret=OutputPort::removeInPort(inPort,forward);
130  if(_repr)
131  if(_setOfInputPort.empty())
132  _repr->removeInPort(_intercptr,forward);
133  return ret;
134 }
135 
137 {
138  _repr=repr;
139  _intercptr=intercptr;
140 }
141 
143 {
144  return new AnySplitOutputPort(*this,newHelder);
145 }
146 
147 SeqAnyInputPort::SeqAnyInputPort(const std::string& name, Node *node, TypeCodeSeq* type):AnyInputPort(name,node,type),DataPort(name,node,type),Port(node)
148 {
149  _type->decrRef();
150 }
151 
152 SeqAnyInputPort::SeqAnyInputPort(const SeqAnyInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),Port(other,newHelder)
153 {
154 }
155 
157 {
158  return new SeqAnyInputPort(*this,newHelder);
159 }
160 
162 {
163  const SequenceAny * valCsted=(const SequenceAny *) _value;
164  if (valCsted) return valCsted->size();
165  return 0;
166 }
167 
169 {
170  const SequenceAny * valCsted=(const SequenceAny *) _value;
171  AnyPtr ret=(*valCsted)[i];
172  ret->incrRef();
173  return ret;
174 }
175 
177 {
178  stringstream xmldump;
179  int nbElem = getNumberOfElements();
180  xmldump << "<value><array><data>" << endl;
181  for (int i = 0; i < nbElem; i++)
182  {
183  Any *val = getValueAtRank(i);
184  switch (((YACS::ENGINE::TypeCodeSeq *)edGetType())->contentType()->kind())
185  {
186  case Double:
187  xmldump << "<value><double>" << setprecision(16) << val->getDoubleValue() << "</double></value>" << endl;
188  break;
189  case Int:
190  xmldump << "<value><int>" << val->getIntValue() << "</int></value>" << endl;
191  break;
192  case Bool:
193  xmldump << "<value><boolean>" << val->getBoolValue() << "</boolean></value>" << endl;
194  break;
195  case String:
196  xmldump << "<value><string>" << val->getStringValue() << "</string></value>" << endl;
197  break;
198  case Objref:
199  xmldump << "<value><objref>" << ToBase64(val->getStringValue()) << "</objref></value>" << endl;
200  break;
201  default:
202  xmldump << "<value><error> NO_SERIALISATION_AVAILABLE </error></value>" << endl;
203  break;
204  }
205  }
206  xmldump << "</data></array></value>" << endl;
207  return xmldump.str();
208 }
209 
210 SplitterNode::SplitterNode(const std::string& name, TypeCode *typeOfData,
211  ForEachLoopGen *father):ElementaryNode(name),
212  _dataPortToDispatch(NAME_OF_SEQUENCE_INPUT,
213  this,(TypeCodeSeq *)TypeCode::sequenceTc("","",typeOfData))
214 {
215  _father=father;
216 }
217 
219  _dataPortToDispatch(other._dataPortToDispatch,this)
220 {
221 }
222 
223 InputPort *SplitterNode::getInputPort(const std::string& name) const
224 {
225  if(name==NAME_OF_SEQUENCE_INPUT)
226  return (InputPort *)&_dataPortToDispatch;
227  else
228  return ElementaryNode::getInputPort(name);
229 }
230 
231 Node *SplitterNode::simpleClone(ComposedNode *father, bool editionOnly) const
232 {
233  return new SplitterNode(*this,(ForEachLoopGen *)father);
234 }
235 
237 {
239 }
240 
242 {
243  //Nothing : should never been called elsewhere big problem...
244 }
245 
246 void SplitterNode::init(bool start)
247 {
248  ElementaryNode::init(start);
250 }
251 
252 void SplitterNode::putSplittedValueOnRankTo(int rankInSeq, int branch, bool first)
253 {
254  Any *valueToDispatch=_dataPortToDispatch.getValueAtRank(rankInSeq);
255  ForEachLoopGen *fatherTyped=(ForEachLoopGen *)_father;
256  fatherTyped->putValueOnBranch(valueToDispatch,branch,first);
257  valueToDispatch->decrRef();
258 }
259 
261  _loop(loop),
262  _normalFinish(normalFinish)
263 {
266 }
267 
269  _normalFinish(false)
270 {
271 }
272 
273 Node *FakeNodeForForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
274 {
275  return new FakeNodeForForEachLoop(*this);
276 }
277 
279 {
281 }
282 
284 {
286 }
287 
289 {
290  if(!_normalFinish)
291  throw Exception("");//only to trigger ABORT on Executor
292  else
294 }
295 
297 {
299 }
300 
302 {
304 }
305 
306 ForEachLoopPassedData::ForEachLoopPassedData(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs):_passedIds(passedIds),_passedOutputs(passedOutputs),_nameOfOutputs(nameOfOutputs)
307 {
308  std::size_t sz(_passedIds.size()),sz1(passedOutputs.size()),sz2(nameOfOutputs.size());
309  if(sz1!=sz2)
310  throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : nameOfOutputs and passedOutputs must have the same size !");
311  for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
312  {
313  const SequenceAny *elt(*it);
314  if(elt)
315  if(sz!=(std::size_t)elt->size())
316  throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : incoherent input of passed data !");
317  }
318  for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
319  {
320  SequenceAny *elt(*it);
321  if(elt)
322  elt->incrRef();
323  }
324 }
325 
327 : _passedIds(copy._passedIds),
328  _passedOutputs(copy._passedOutputs),
329  _nameOfOutputs(copy._nameOfOutputs),
330  _flagsIds(copy._flagsIds)
331 {
332 }
333 
335 {
336  for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
337  {
338  SequenceAny *elt(*it);
339  if(elt)
340  elt->decrRef();
341  }
342 }
343 
345 {
346  _flagsIds.clear();
347 }
348 
350 {
351  if(nbOfElts<0)
352  throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : nb of elts is expected to be > 0 !");
353  std::size_t sizeExp(_passedIds.size()),nbOfElts2(nbOfElts);
354  if(nbOfElts2<sizeExp)
355  throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set !");
356  for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
357  {
358  if((*it)>=nbOfElts2)
359  throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set 2 !");
360  }
361  _flagsIds.resize(nbOfElts);
362  std::fill(_flagsIds.begin(),_flagsIds.end(),false);
363  for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
364  {
365  if(*it<nbOfElts)
366  {
367  if(!_flagsIds[*it])
368  _flagsIds[*it]=true;
369  else
370  {
371  std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : id " << *it << " in list of ids appears more than once !";
372  throw YACS::Exception(oss.str());
373  }
374  }
375  else
376  {
377  std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : Presence of id " << *it << " in list of ids ! Must be in [0," << nbOfElts << ") !";
378  throw YACS::Exception(oss.str());
379  }
380  }
381 }
382 
383 void ForEachLoopPassedData::checkLevel2(const std::vector<AnyInputPort *>& ports) const
384 {
385  std::size_t sz(_nameOfOutputs.size());
386  if(sz!=ports.size())
387  throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : mismatch of size of vectors !");
388  for(std::size_t i=0;i<sz;i++)
389  {
390  AnyInputPort *elt(ports[i]);
391  if(!elt)
392  throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : presence of null instance !");
393  if(_nameOfOutputs[i]!=elt->getName())
394  {
395  std::ostringstream oss; oss << "ForEachLoopPassedData::checkLevel2 : At pos #" << i << " the name is not OK !";
396  throw YACS::Exception(oss.str());
397  }
398  }
399 }
400 
404 int ForEachLoopPassedData::toAbsId(int localId) const
405 {
406  if(localId<0)
407  throw YACS::Exception("ForEachLoopPassedData::toAbsId : local pos must be >= 0 !");
408  int ret(0),curLocId(0);
409  for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
410  {
411  if(!*it)
412  {
413  if(localId==curLocId)
414  return ret;
415  curLocId++;
416  }
417  }
418  throw YACS::Exception("ForEachLoopPassedData::toAbsId : not referenced Id !");
419 }
420 
424 int ForEachLoopPassedData::toAbsIdNot(int localId) const
425 {
426  if(localId<0)
427  throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : local pos must be >= 0 !");
428  int ret(0),curLocId(0);
429  for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
430  {
431  if(*it)//<- diff is here !
432  {
433  if(localId==curLocId)
434  return ret;
435  curLocId++;
436  }
437  }
438  throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : not referenced Id !");
439 }
440 
442 {
443  std::size_t nbAllElts(_flagsIds.size());
444  std::size_t ret(nbAllElts-_passedIds.size());
445  return ret;
446 }
447 
448 void ForEachLoopPassedData::assignAlreadyDone(const std::vector<SequenceAny *>& execVals) const
449 {
450  std::size_t sz(execVals.size());
451  if(_passedOutputs.size()!=sz)
452  throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : mismatch of size of vectors !");
453  for(std::size_t i=0;i<sz;i++)
454  {
456  SequenceAny *eltDestination(execVals[i]);
457  if(!elt)
458  throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : presence of null elt !");
459  unsigned int szOfElt(elt->size());
460  for(unsigned int j=0;j<szOfElt;j++)
461  {
462  AnyPtr elt1((*elt)[j]);
463  int jAbs(toAbsIdNot(j));
464  eltDestination->setEltAtRank(jAbs,elt1);
465  }
466  }
467 }
468 
469 ForEachLoopGen::ForEachLoopGen(const std::string& name, TypeCode *typeOfDataSplitted, std::unique_ptr<NbBranchesAbstract>&& branchManager):
470  DynParaLoop(name,typeOfDataSplitted,std::move(branchManager)),
471  _splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
472  _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
473 {
474 }
475 
476 ForEachLoopGen::ForEachLoopGen(const ForEachLoopGen& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
477  _splitterNode(other._splitterNode,this),
478  _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
479 {
480  int i=0;
481  if(!editionOnly)
482  for(vector<AnySplitOutputPort *>::const_iterator iter2=other._outGoingPorts.begin();iter2!=other._outGoingPorts.end();iter2++,i++)
483  {
484  AnySplitOutputPort *temp=new AnySplitOutputPort(*(*iter2),this);
486  temp->addRepr(getOutPort(other.getOutPortName((*iter2)->getRepr())),interc);
487  interc->setRepr(temp);
488  _outGoingPorts.push_back(temp);
489  _intecptrsForOutGoingPorts.push_back(interc);
490  }
491 }
492 
494 {
495  cleanDynGraph();
496  for(vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
497  delete *iter;
498  for(vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();iter2!=_intecptrsForOutGoingPorts.end();iter2++)
499  delete *iter2;
500  delete _passedData;
501 }
502 
503 void ForEachLoopGen::init(bool start)
504 {
505  DynParaLoop::init(start);
506  _splitterNode.init(start);
507  _execCurrentId=0;
508  cleanDynGraph();
509  _currentIndex = 0;
511  if(_passedData)
512  _passedData->init();
513 }
514 
516 {
517  DEBTRACE("ForEachLoopGen::exUpdateState");
518  if(_state == YACS::DISABLED)
519  return;
520  if(_state == YACS::DONE)
521  return;
522  if(_inGate.exIsReady())
523  {
524  //internal graph update
525  int i;
526  int nbOfElts(_splitterNode.getNumberOfElements()),nbOfEltsDone(0);
527  int nbOfBr(_nbOfBranches->getNumberOfBranches(nbOfElts));
528  if(_passedData)
529  {
531  nbOfEltsDone=_passedData->getNumberOfEltsAlreadyDone();
532  }
533  int nbOfEltsToDo(nbOfElts-nbOfEltsDone);
534 
535  DEBTRACE("nbOfElts=" << nbOfElts);
536  DEBTRACE("nbOfBr=" << nbOfBr);
537  INFO_YACSTRACE("nbOfBr (" << getName() << ") = " << nbOfBr);
538 
539  if(nbOfEltsToDo==0)
540  {
542  delete _nodeForSpecialCases;
545  return ;
546  }
547  if(nbOfBr<=0)
548  {
549  delete _nodeForSpecialCases;
552  return ;
553  }
554  if(nbOfBr>nbOfEltsToDo)
555  nbOfBr=nbOfEltsToDo;
556  _execNodes.resize(nbOfBr);
557  _execIds.resize(nbOfBr);
558  _execOutGoingPorts.resize(nbOfBr);
559  prepareSequenceValues(nbOfElts);
560  if(_initNode)
561  _execInitNodes.resize(nbOfBr);
563  if (_finalizeNode)
564  _execFinalizeNodes.resize(nbOfBr);
565 
566  vector<Node *> origNodes;
567  origNodes.push_back(_initNode);
568  origNodes.push_back(_node);
569  origNodes.push_back(_finalizeNode);
570 
571  //Conversion exceptions can be thrown by createOutputOutOfScopeInterceptors
572  //so catch them to control errors
573  try
574  {
575  for(i=0;i<nbOfBr;i++)
576  {
577  DEBTRACE( "-------------- 2" );
578  vector<Node *> clonedNodes = cloneAndPlaceNodesCoherently(origNodes);
579  if(_initNode)
580  _execInitNodes[i] = clonedNodes[0];
581  _execNodes[i] = clonedNodes[1];
582  if(_finalizeNode)
583  _execFinalizeNodes[i] = clonedNodes[2];
584  DEBTRACE( "-------------- 4" );
586  DEBTRACE( "-------------- 5" );
588  DEBTRACE( "-------------- 6" );
589  }
590  for(i=0;i<nbOfBr;i++)
591  {
592  DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
594  int posInAbs(_execCurrentId);
595  if(_passedData)
598  _execCurrentId++;
599  DEBTRACE( "-------------- 7" );
600  }
601  if(_passedData)
602  {
605  }
606  // clean inputs data coming from the outside in _node
607  set< InPort * > portsToSetVals=getAllInPortsComingFromOutsideOfCurrentScope();
608  for(auto iter : portsToSetVals)
609  {
610  InputPort *curPortCasted=(InputPort *) iter;//Cast granted by ForEachLoopGen::buildDelegateOf(InPort)
611  if(!curPortCasted->canSafelySqueezeMemory())// this can appear strange ! if not safelySqueeze -> release. Nevertheless it is true.
612  curPortCasted->releaseData(); // these input ports have been incremented with InputPort::put into DynParaLoop::prepareInputsFromOutOfScope. So they can be released now.
613  }
614  }
615  catch(YACS::Exception& ex)
616  {
617  //ForEachLoop must be put in error and the exception rethrown to notify the caller
618  DEBTRACE( "ForEachLoopGen::exUpdateState: " << ex.what() );
620  setErrorDetails(ex.what());
621  exForwardFailed();
622  throw;
623  }
624 
625  setState(YACS::ACTIVATED); // move the calling of setState method there for adding observers for clone nodes in GUI part
626 
627  //let's go
628  for(i=0;i<nbOfBr;i++)
629  if(_initNode)
630  {
631  _execInitNodes[i]->exUpdateState();
633  }
634  else
635  {
637  _execNodes[i]->exUpdateState();
638  }
639 
641  }
642 }
643 
645 {
646  // emit notification to all observers registered with the dispatcher on any change of the node's state
647  sendEvent("progress");
648 }
649 
650 void ForEachLoopGen::getReadyTasks(std::vector<Task *>& tasks)
651 {
652  if(!_node)
653  return;
656  {
658  {
660  return ;
661  }
662  vector<Node *>::iterator iter;
663  for (iter=_execNodes.begin() ; iter!=_execNodes.end() ; iter++)
664  (*iter)->getReadyTasks(tasks);
665  for (iter=_execInitNodes.begin() ; iter!=_execInitNodes.end() ; iter++)
666  (*iter)->getReadyTasks(tasks);
667  for (iter=_execFinalizeNodes.begin() ; iter!=_execFinalizeNodes.end() ; iter++)
668  (*iter)->getReadyTasks(tasks);
669  }
670 }
671 
673 {
675 }
676 
678 {
679  //TO DO
680 }
681 
682 void ForEachLoopGen::selectRunnableTasks(std::vector<Task *>& tasks)
683 {
684 }
685 
686 std::list<InputPort *> ForEachLoopGen::getSetOfInputPort() const
687 {
688  list<InputPort *> ret=DynParaLoop::getSetOfInputPort();
689  ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
690  return ret;
691 }
692 
693 std::list<InputPort *> ForEachLoopGen::getLocalInputPorts() const
694 {
695  list<InputPort *> ret=DynParaLoop::getLocalInputPorts();
696  ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
697  return ret;
698 }
699 
700 InputPort *ForEachLoopGen::getInputPort(const std::string& name) const
701 {
704  else
705  return DynParaLoop::getInputPort(name);
706 }
707 
708 OutputPort *ForEachLoopGen::getOutputPort(const std::string& name) const
709 {
710  for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
711  {
712  if(name==(*iter)->getName())
713  return (OutputPort *)(*iter);
714  }
715  return DynParaLoop::getOutputPort(name);
716 }
717 
718 OutPort *ForEachLoopGen::getOutPort(const std::string& name) const
719 {
720  for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
721  {
722  if(name==(*iter)->getName())
723  return (OutPort *)(*iter);
724  }
725  return DynParaLoop::getOutPort(name);
726 }
727 
728 Node *ForEachLoopGen::getChildByShortName(const std::string& name) const
729 {
730  if(name==NAME_OF_SPLITTERNODE)
731  return (Node *)&_splitterNode;
732  else
734 }
735 
737 
744 {
745  DEBTRACE("updateStateOnFinishedEventFrom " << node->getName() << " " << node->getState());
746  unsigned int id;
747  switch(getIdentityOfNotifyerNode(node,id))
748  {
749  case INIT_NODE:
751  case WORK_NODE:
752  return updateStateForWorkNodeOnFinishedEventFrom(node,id,true);
753  case FINALIZE_NODE:
755  default:
756  YASSERT(false);
757  }
758  return YACS::NOEVENT;
759 }
760 
762 {
763  _execNodes[id]->exUpdateState();
766  _currentIndex++;
767  if (_initializingCounter == 0)
769  return YACS::NOEVENT;
770 }
771 
776 {
777  _currentIndex++;
779  if(isNormalFinish)
780  {
781  int globalId(_execIds[id]);
782  if(_passedData)
783  globalId=_passedData->toAbsId(globalId);
784  sendEvent2("progress_ok",&globalId);
786  }
787  else
788  {
789  int globalId(_execIds[id]);
790  if(_passedData)
791  globalId=_passedData->toAbsId(globalId);
792  sendEvent2("progress_ko",&globalId);
793  }
794  //
796  {//No more elements of _dataPortToDispatch to treat
798  //analyzing if some samples are still on treatment on other branches.
799  bool isFinished(true);
800  for(int i=0;i<_execIds.size() && isFinished;i++)
802  if(isFinished)
803  {
804  try
805  {
806  if(_failedCounter!=0)
807  {// case of keepgoing mode + a failed
808  std::ostringstream oss; oss << "Keep Going mode activated and some errors (" << _failedCounter << ")reported !";
809  DEBTRACE("ForEachLoopGen::updateStateOnFinishedEventFrom : "<< oss.str());
811  return YACS::ABORT;
812  }
814 
815  if (_node)
816  {
818 
819  ComposedNode* compNode = dynamic_cast<ComposedNode*>(_node);
820  if (compNode)
821  {
822  std::list<Node *> aChldn = compNode->getAllRecursiveConstituents();
823  std::list<Node *>::iterator iter=aChldn.begin();
824  for(;iter!=aChldn.end();iter++)
825  (*iter)->setState(YACS::DONE);
826  }
827  }
828 
829  if (_finalizeNode == NULL)
830  {
831  // No finalize node, we just finish the loop at the end of exec nodes execution
833  return YACS::FINISH;
834  }
835  else
836  {
837  // Run the finalize nodes, the loop will be done only when they all finish
838  _unfinishedCounter = 0; // This counter indicates how many branches are not finished
839  for (int i=0 ; i<_execIds.size() ; i++)
840  {
842  DEBTRACE("Launching finalize node for branch " << i);
843  _execFinalizeNodes[i]->exUpdateState();
845  }
846  return YACS::NOEVENT;
847  }
848  }
849  catch(YACS::Exception& ex)
850  {
851  DEBTRACE("ForEachLoopGen::updateStateOnFinishedEventFrom: "<<ex.what());
852  //no way to push results : put following nodes in FAILED state
853  //TODO could be more fine grain : put only concerned nodes in FAILED state
854  exForwardFailed();
856  return YACS::ABORT;
857  }
858  }
859  }
860  else if(_state == YACS::ACTIVATED)
861  {//more elements to do and loop still activated
863  int posInAbs(_execCurrentId);
864  if(_passedData)
866  _splitterNode.putSplittedValueOnRankTo(posInAbs,id,false);
867  //forwardExecStateToOriginalBody(node);
868  node->init(false);
869  _execCurrentId++;
870  node->exUpdateState();
871  //forwardExecStateToOriginalBody(node);
873  }
874  else
875  {//elements to process and loop no more activated
876  DEBTRACE("foreach loop state " << _state);
877  }
878  return YACS::NOEVENT;
879 }
880 
882 {
883  DEBTRACE("Finalize node finished on branch " << id);
885  _currentIndex++;
887  DEBTRACE(_unfinishedCounter << " finalize nodes still running");
888  if (_unfinishedCounter == 0)
889  {
892  return YACS::FINISH;
893  }
894  else
895  return YACS::NOEVENT;
896 }
897 
899 {
900  unsigned int id;
902  // TODO: deal with keepgoing without the dependency to Executor
903  if(ton!=WORK_NODE || !execInst->getKeepGoingProperty())
904  return DynParaLoop::updateStateOnFailedEventFrom(node,execInst);
905  else
906  {
907  _failedCounter++;
908  return updateStateForWorkNodeOnFinishedEventFrom(node,id,false);
909  }
910 }
911 
912 void ForEachLoopGen::InterceptorizeNameOfPort(std::string& portName)
913 {
914  std::replace_if(portName.begin(), portName.end(), std::bind(std::equal_to<char>(), '.',std::placeholders::_1), '_');
915  portName += INTERCEPTOR_STR;
916 }
917 
918 void ForEachLoopGen::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
919 {
920  DynParaLoop::buildDelegateOf(port,finalTarget,pointsOfView);
921  string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
922  if(typeOfPortInstance==OutputPort::NAME)
923  {
924  vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
925  int i=0;
926  for(;iter!=_outGoingPorts.end();iter++,i++)
927  if((*iter)->getRepr()==port.first || *iter==port.first)
928  break;
929  if(iter!=_outGoingPorts.end())
930  {
931  if(*iter!=port.first)
932  {
933  (*iter)->incrRef();
934  (*iter)->addRepr(port.first,_intecptrsForOutGoingPorts[i]);
935  }
936  port.first=*iter;
937  }
938  else
939  {
940  TypeCode *tcTrad((YACS::ENGINE::TypeCode*)finalTarget->edGetType()->subContentType(getFEDeltaBetween(port.first,finalTarget)));
941  TypeCodeSeq *newTc=(TypeCodeSeq *)TypeCode::sequenceTc("","",tcTrad);
942  // The out going ports belong to the ForEachLoop, whereas
943  // the delegated port belongs to a node child of the ForEachLoop.
944  // The name of the delegated port contains dots (bloc.node.outport),
945  // whereas the name of the out going port shouldn't do.
946  std::string outputPortName(getPortName(port.first));
947  InterceptorizeNameOfPort(outputPortName);
948  AnySplitOutputPort *newPort(new AnySplitOutputPort(outputPortName,this,newTc));
949  InterceptorInputPort *intercptor(new InterceptorInputPort(outputPortName + "_in",this,tcTrad));
950  intercptor->setRepr(newPort);
951  newTc->decrRef();
952  newPort->addRepr(port.first,intercptor);
953  _outGoingPorts.push_back(newPort);
954  _intecptrsForOutGoingPorts.push_back(intercptor);
955  port.first=newPort;
956  }
957  }
958  else
959  throw Exception("ForEachLoopGen::buildDelegateOf : not implemented for DS because not specified");
960 }
961 
962 void ForEachLoopGen::getDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
963 {
964  string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
965  if(typeOfPortInstance==OutputPort::NAME)
966  {
967  vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
968  for(;iter!=_outGoingPorts.end();iter++)
969  if((*iter)->getRepr()==port.first)
970  break;
971  if(iter==_outGoingPorts.end())
972  {
973  string what("ForEachLoopGen::getDelegateOf : Port with name "); what+=port.first->getName(); what+=" not exported by ForEachLoop "; what+=_name;
974  throw Exception(what);
975  }
976  else
977  port.first=(*iter);
978  }
979  else
980  throw Exception("ForEachLoopGen::getDelegateOf : not implemented because not specified");
981 }
982 
983 void ForEachLoopGen::releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
984 {
985  string typeOfPortInstance=portDwn->getNameOfTypeOfCurrentInstance();
986  if(typeOfPortInstance==OutputPort::NAME)
987  {
988  vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
989  vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();
990  for(;iter!=_outGoingPorts.end();iter++,iter2++)
991  if((*iter)->getRepr()==portDwn)
992  break;
993  //ASSERT(portUp==*iter.second)
994  if((*iter)->decrRef())
995  {
996  AnySplitOutputPort *p=*iter;
997  _outGoingPorts.erase(iter);
998  delete p;
999  InterceptorInputPort *ip=*iter2;
1000  _intecptrsForOutGoingPorts.erase(iter2);
1001  delete ip;
1002  }
1003  }
1004 }
1005 
1006 OutPort *ForEachLoopGen::getDynOutPortByAbsName(int branchNb, const std::string& name)
1007 {
1008  string portName, nodeName;
1009  splitNamesBySep(name,Node::SEP_CHAR_IN_PORT,nodeName,portName,false);
1010  Node *staticChild = getChildByName(nodeName);
1011  return _execNodes[branchNb]->getOutPort(portName);//It's impossible(garanteed by YACS::ENGINE::ForEachLoopGen::buildDelegateOf)
1012  //that a link starting from _initNode goes out of scope of 'this'.
1013 }
1014 
1016 {
1018  for(vector< SequenceAny *>::iterator iter3=_execVals.begin();iter3!=_execVals.end();iter3++)
1019  (*iter3)->decrRef();
1020  _execVals.clear();
1021  for(vector< vector<AnyInputPort *> >::iterator iter4=_execOutGoingPorts.begin();iter4!=_execOutGoingPorts.end();iter4++)
1022  for(vector<AnyInputPort *>::iterator iter5=(*iter4).begin();iter5!=(*iter4).end();iter5++)
1023  delete *iter5;
1024  _execOutGoingPorts.clear();
1025 }
1026 
1028 {
1029  vector<AnyInputPort *>::iterator iter;
1030  int i=0;
1031  for(iter=_execOutGoingPorts[branchNb].begin();iter!=_execOutGoingPorts[branchNb].end();iter++,i++)
1032  {
1033  Any *val=(Any *)(*iter)->getValue();
1034  _execVals[i]->setEltAtRank(rank,val);
1035  }
1036 }
1037 
1039 {
1040  if(!_passedData)
1042  else
1044 }
1045 
1047 {
1048  _execVals.resize(_outGoingPorts.size());
1049  vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1050  for(int i=0;iter!=_outGoingPorts.end();iter++,i++)
1051  _execVals[i]=SequenceAny::New((*iter)->edGetType()->contentType(),sizeOfSamples);
1052 }
1053 
1055 {
1056  vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1057  int i=0;
1058  for(;iter!=_outGoingPorts.end();iter++,i++)
1059  (*iter)->put((const void *)_execVals[i]);
1060 }
1061 
1063 {
1064  vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1065  int i=0;
1066  for(;iter!=_outGoingPorts.end();iter++,i++)
1067  {
1068  DEBTRACE( (*iter)->getName() << " " << (*iter)->edGetType()->kind() );
1069  //AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,(*iter)->edGetType());
1070  OutPort *portOut=getDynOutPortByAbsName(branchNb,getOutPortName(((*iter)->getRepr())));
1071  DEBTRACE( portOut->getName() );
1072  TypeCode *tc((TypeCode *)(*iter)->edGetType()->contentType());
1073  AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,tc);
1074  portOut->addInPort(interceptor);
1075  _execOutGoingPorts[branchNb].push_back(interceptor);
1076  }
1077 }
1078 
1079 void ForEachLoopGen::checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
1080  InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd)
1081 {
1082  DynParaLoop::checkLinkPossibility(start, pointsOfViewStart, end, pointsOfViewEnd);
1083  if(end->getNode() == &_splitterNode)
1084  throw Exception("Illegal link within a foreach loop: \
1085 the 'SmplsCollection' port cannot be linked within the scope of the loop.");
1086  if(end == _nbOfBranches->getPort())
1087  throw Exception("Illegal link within a foreach loop: \
1088 the 'nbBranches' port cannot be linked within the scope of the loop.");
1089 }
1090 
1091 std::list<OutputPort *> ForEachLoopGen::getLocalOutputPorts() const
1092 {
1093  list<OutputPort *> ret;
1094  ret.push_back(getOutputPort(NAME_OF_SPLITTED_SEQ_OUT));
1095  return ret;
1096 }
1097 
1099 
1102 void ForEachLoopGen::writeDot(std::ostream &os) const
1103 {
1104  os << " subgraph cluster_" << getId() << " {\n" ;
1105  //only one node in a loop
1106  if(_node)
1107  {
1108  _node->writeDot(os);
1109  os << getId() << " -> " << _node->getId() << ";\n";
1110  }
1111  os << "}\n" ;
1112  os << getId() << "[fillcolor=\"" ;
1114  os << getColorState(state);
1115  os << "\" label=\"" << "Loop:" ;
1116  os << getName() <<"\"];\n";
1117 }
1118 
1121 {
1122  if(level==0)return;
1123  DynParaLoop::resetState(level);
1124  _execCurrentId=0;
1125  cleanDynGraph();
1126 }
1127 
1128 std::string ForEachLoopGen::getProgress() const
1129 {
1130  int nbElems(getNbOfElementsToBeProcessed());
1131  std::stringstream aProgress;
1132  if (nbElems > 0)
1133  aProgress << _currentIndex << "/" << nbElems;
1134  else
1135  aProgress << "0";
1136  return aProgress.str();
1137 }
1138 
1140 
1144 list<ProgressWeight> ForEachLoopGen::getProgressWeight() const
1145 {
1146  list<ProgressWeight> ret;
1147  list<Node *> setOfNode=edGetDirectDescendants();
1148  int elemDone=getCurrentIndex();
1149  int elemTotal=getNbOfElementsToBeProcessed();
1150  for(list<Node *>::const_iterator iter=setOfNode.begin();iter!=setOfNode.end();iter++)
1151  {
1152  list<ProgressWeight> myCurrentSet=(*iter)->getProgressWeight();
1153  for(list<ProgressWeight>::iterator iter=myCurrentSet.begin();iter!=myCurrentSet.end();iter++)
1154  {
1155  (*iter).weightDone=((*iter).weightTotal) * elemDone;
1156  (*iter).weightTotal*=elemTotal;
1157  }
1158  ret.insert(ret.end(),myCurrentSet.begin(),myCurrentSet.end());
1159  }
1160  return ret;
1161 }
1162 
1164 {
1165  int nbOfElems(_splitterNode.getNumberOfElements());
1166  int nbBranches = _nbOfBranches->getNumberOfBranches(nbOfElems);
1167  return nbOfElems
1168  + (_initNode ? nbBranches:0)
1169  + (_finalizeNode ? nbBranches:0) ;
1170 }
1171 
1186 std::vector<unsigned int> ForEachLoopGen::getPassedResults(Executor *execut, std::vector<SequenceAny *>& outputs, std::vector<std::string>& nameOfOutputs) const
1187 {
1189  if(_execVals.empty())
1190  return std::vector<unsigned int>();
1191  if(_execOutGoingPorts.empty())
1192  return std::vector<unsigned int>();
1193  std::size_t sz(_execVals.size());
1194  outputs.resize(sz);
1195  nameOfOutputs.resize(sz);
1196  const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
1197  for(std::size_t i=0;i<sz;i++)
1198  {
1199  outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
1200  nameOfOutputs[i]=ports[i]->getName();
1201  }
1202  return _execVals[0]->getSetItems();
1203 }
1204 
1209 void ForEachLoopGen::assignPassedResults(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs)
1210 {
1211  delete _passedData;
1212  _failedCounter=0;
1213  _passedData=new ForEachLoopPassedData(passedIds,passedOutputs,nameOfOutputs);
1214 }
1215 
1217 {
1218  Node *ns(start->getNode()),*ne(end->getNode());
1220  int ret(0);
1221  Node *work(ns);
1222  while(work!=co)
1223  {
1224  ForEachLoopGen *isFE(dynamic_cast<ForEachLoopGen *>(work));
1225  if(isFE)
1226  ret++;
1227  work=work->getFather();
1228  }
1229  if(dynamic_cast<AnySplitOutputPort *>(start))
1230  ret--;
1231  return ret;
1232 }
1233 
1239 {
1240  std::vector<SequenceAny *> outputs;
1241  std::vector<std::string> nameOfOutputs;
1242  if(_execVals.empty() || _execOutGoingPorts.empty())
1243  return new ForEachLoopPassedData(std::vector<unsigned int>(), outputs, nameOfOutputs);
1244  std::size_t sz(_execVals.size());
1245  outputs.resize(sz);
1246  nameOfOutputs.resize(sz);
1247  const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
1248  for(std::size_t i=0;i<sz;i++)
1249  {
1250  outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
1251  nameOfOutputs[i]=ports[i]->getName();
1252  }
1253  return new ForEachLoopPassedData(_execVals[0]->getSetItems(), outputs, nameOfOutputs);
1254 }
1255 
1257 {
1258  if(_passedData)
1259  delete _passedData;
1260  _passedData = processedData;
1261 }
1262 
1266 const YACS::ENGINE::TypeCode* ForEachLoopGen::getOutputPortType(const std::string& portName)const
1267 {
1268  const YACS::ENGINE::TypeCode* ret=NULL;
1269  vector<AnySplitOutputPort *>::const_iterator it;
1270  for(it=_outGoingPorts.begin();it!=_outGoingPorts.end() && ret==NULL;it++)
1271  {
1272  std::string originalPortName(getPortName(*it));
1273  //InterceptorizeNameOfPort(originalPortName);
1274  DEBTRACE("ForEachLoopGen::getOutputPortType compare " << portName << " == " << originalPortName);
1275  if(originalPortName == portName)
1276  {
1277  ret = (*it)->edGetType()->contentType();
1278  }
1279  }
1280  return ret;
1281 }
1282 
1283 Node *ForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
1284 {
1285  return new ForEachLoop(*this,father,editionOnly);
1286 }
1287 
1289 {
1290  visitor->visitForEachLoop(this);
1291 }
1292 
1294 {
1295  visitor->visitForEachLoopDyn(this);
1296 }
1297 
1298 Node *ForEachLoopDyn::simpleClone(ComposedNode *father, bool editionOnly) const
1299 {
1300  return new ForEachLoopDyn(*this,father,editionOnly);
1301 }
#define YASSERT(val)
YASSERT macro is always defined, used like assert, but throw a YACS::Exception instead of abort.
Definition: YacsTrace.hxx:59
#define INFO_YACSTRACE(msg)
Definition: YacsTrace.hxx:51
#define DEBTRACE(msg)
Definition: YacsTrace.hxx:31
: Allow to manage memory of instances of T. The only constraint on T is to have method incrRef and De...
Definition: SharedPtr.hxx:30
void getAllRepresented(std::set< OutPort * > &represented) const
bool addInPort(InPort *inPort)
void addRepr(OutPort *repr, InterceptorInputPort *intercptr)
int removeInPort(InPort *inPort, bool forward)
InterceptorInputPort * _intercptr
Definition: ForEachLoop.hxx:60
OutputPort * clone(Node *newHelder) const
AnySplitOutputPort(const std::string &name, Node *node, TypeCode *type)
Definition: ForEachLoop.cxx:98
: Interface for management of storage of data formated dynamically in its TypeCode....
Definition: Any.hxx:79
virtual std::string getStringValue() const =0
virtual int getIntValue() const =0
virtual double getDoubleValue() const =0
virtual bool getBoolValue() const =0
Base class for all composed nodes.
Node * getChildByName(const std::string &name) const
std::string getName() const
virtual std::list< Node * > getAllRecursiveConstituents()
Idem getAllRecursiveNodes, but this node is NOT included.
std::set< OutPort * > getAllOutPortsLeavingCurrentScope() const
List all output ports of children nodes that are linked to out of scope input ports.
std::string getPortName(const PORT *port) const
static bool splitNamesBySep(const std::string &globalName, const char separator[], std::string &firstPart, std::string &lastPart, bool priority)
Splits name globalName in 2 parts using separator.
std::string getOutPortName(const OutPort *) const
virtual void resetState(int level)
Reset the state of the node and its children depending on the parameter level.
std::set< InPort * > getAllInPortsComingFromOutsideOfCurrentScope() const
List all input ports that are linked to out of scope ports.
static ComposedNode * getLowestCommonAncestor(Node *node1, Node *node2)
Retrieves the lowest common ancestor of 2 nodes.
std::string getNameOfTypeOfCurrentInstance() const
Definition: DataPort.cxx:56
TypeCode * edGetType() const
Definition: DataPort.hxx:53
std::string getName() const
Definition: DataPort.hxx:55
Base class for dynamically (fully or semifully) built graphs.
Definition: DynParaLoop.hxx:41
Node * getChildByShortName(const std::string &name) const
InputPort * getInputPort(const std::string &name) const
Get an input port given its name.
int getNumberOfInputPorts() const
virtual void checkLinkPossibility(OutPort *start, const std::list< ComposedNode * > &pointsOfViewStart, InPort *end, const std::list< ComposedNode * > &pointsOfViewEnd)
std::unique_ptr< NbBranchesAbstract > _nbOfBranches
Definition: DynParaLoop.hxx:56
std::vector< Node * > _execNodes
Definition: DynParaLoop.hxx:58
static const char NAME_OF_SPLITTED_SEQ_OUT[]
Definition: DynParaLoop.hxx:67
std::vector< Node * > cloneAndPlaceNodesCoherently(const std::vector< Node * > &origNodes)
Clone nodes and make their placement consistent with the placement of the original ones.
void buildDelegateOf(InPort *&port, OutPort *initialStart, const std::list< ComposedNode * > &pointsOfView)
OutputPort * getOutputPort(const std::string &name) const
Get an output port given its name.
OutPort * getOutPort(const std::string &name) const
std::vector< int > _execIds
Definition: DynParaLoop.hxx:54
std::list< Node * > edGetDirectDescendants() const
virtual YACS::Event updateStateOnFailedEventFrom(Node *node, const Executor *execInst)
Method used to notify the node that a child node has failed.
std::list< InputPort * > getLocalInputPorts() const
redefined on derived class of ComposedNode. by default a ComposedNode has no port by itself
void init(bool start=true)
std::vector< Node * > _execFinalizeNodes
Definition: DynParaLoop.hxx:60
void prepareInputsFromOutOfScope(int branchNb)
virtual void cleanDynGraph()
virtual void forwardExecStateToOriginalBody(Node *execNode)
void putValueOnBranch(Any *val, unsigned branchId, bool first)
std::vector< Node * > _execInitNodes
Definition: DynParaLoop.hxx:59
std::list< InputPort * > getSetOfInputPort() const
TypeOfNode getIdentityOfNotifyerNode(const Node *node, unsigned &id)
Base class for all calculation nodes.
InputPort * getInputPort(const std::string &name) const
void init(bool start=true)
void getReadyTasks(std::vector< Task * > &tasks)
Threaded Executor.
Definition: Executor.hxx:63
YACS::BASES::Mutex & getTheMutexForSchedulerUpdate()
Definition: Executor.hxx:133
bool getKeepGoingProperty() const
Definition: Executor.hxx:107
FakeNodeForForEachLoop(ForEachLoopGen *loop, bool normalFinish)
Node * simpleClone(ComposedNode *father, bool editionOnly) const
Node * simpleClone(ComposedNode *father, bool editionOnly=true) const
void accept(Visitor *visitor)
ForEachLoopDyn(const std::string &name, TypeCode *typeOfDataSplitted)
InputPort * getInputPort(const std::string &name) const
Get an input port given its name.
std::vector< SequenceAny * > _execVals
static const char INTERCEPTOR_STR[]
OutPort * getOutPort(const std::string &name) const
void exUpdateState()
Update the node state.
Node * getChildByShortName(const std::string &name) const
void init(bool start=true)
void assignPassedResults(const std::vector< unsigned int > &passedIds, const std::vector< SequenceAny * > &passedOutputs, const std::vector< std::string > &nameOfOutputs)
void storeOutValsInSeqForOutOfScopeUse(int rank, int branchNb)
void setProcessedData(ForEachLoopPassedData *processedData)
std::vector< AnySplitOutputPort * > _outGoingPorts
ForEachLoopGen(const std::string &name, TypeCode *typeOfDataSplitted, std::unique_ptr< NbBranchesAbstract > &&branchManager)
virtual void resetState(int level)
Reset the state of the node and its children depending on the parameter level.
YACS::Event updateStateForInitNodeOnFinishedEventFrom(Node *node, unsigned int id)
std::list< InputPort * > getSetOfInputPort() const
static const int NOT_RUNNING_BRANCH_ID
void checkLinkPossibility(OutPort *start, const std::list< ComposedNode * > &pointsOfViewStart, InPort *end, const std::list< ComposedNode * > &pointsOfViewEnd)
std::vector< InterceptorInputPort * > _intecptrsForOutGoingPorts
ports linked to node outside the current scope
void createOutputOutOfScopeInterceptors(int branchNb)
const TypeCode * getOutputPortType(const std::string &portName) const
void writeDot(std::ostream &os) const
Dump the node state to a stream.
void releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::list< ComposedNode * > &pointsOfView)
std::string getProgress() const
void buildDelegateOf(std::pair< OutPort *, OutPort * > &port, InPort *finalTarget, const std::list< ComposedNode * > &pointsOfView)
std::list< InputPort * > getLocalInputPorts() const
redefined on derived class of ComposedNode. by default a ComposedNode has no port by itself
ForEachLoopPassedData * getProcessedData() const
static int getFEDeltaBetween(OutPort *start, InPort *end)
void getDelegateOf(std::pair< OutPort *, OutPort * > &port, InPort *finalTarget, const std::list< ComposedNode * > &pointsOfView)
FakeNodeForForEachLoop * _nodeForSpecialCases
std::vector< unsigned int > getPassedResults(Executor *execut, std::vector< SequenceAny * > &outputs, std::vector< std::string > &nameOfOutputs) const
std::vector< std::vector< AnyInputPort * > > _execOutGoingPorts
int getNbOfElementsToBeProcessed() const
void selectRunnableTasks(std::vector< Task * > &tasks)
ForEachLoopPassedData * _passedData
static void InterceptorizeNameOfPort(std::string &portName)
std::list< OutputPort * > getLocalOutputPorts() const
redefined on derived class of ComposedNode. by default a ComposedNode has no port by itself
void checkNoCyclePassingThrough(Node *node)
void getReadyTasks(std::vector< Task * > &tasks)
void prepareSequenceValues(int sizeOfSamples)
YACS::Event updateStateOnFailedEventFrom(Node *node, const Executor *execInst)
Method used to notify the node that a child node has failed.
YACS::Event updateStateOnFinishedEventFrom(Node *node)
Method used to notify the node that a child node has finished.
OutputPort * getOutputPort(const std::string &name) const
Get an output port given its name.
OutPort * getDynOutPortByAbsName(int branchNb, const std::string &name)
unsigned _execCurrentId
ports created for TypeCodes correctness
YACS::Event updateStateForWorkNodeOnFinishedEventFrom(Node *node, unsigned int id, bool isNormalFinish)
static const char NAME_OF_SPLITTERNODE[]
std::list< ProgressWeight > getProgressWeight() const
Get the progress weight for all elementary nodes.
YACS::Event updateStateForFinalizeNodeOnFinishedEventFrom(Node *node, unsigned int id)
std::vector< SequenceAny * > _passedOutputs
std::vector< std::string > _nameOfOutputs
int toAbsIdNot(int localId) const
void checkCompatibilyWithNb(int nbOfElts) const
ForEachLoopPassedData(const std::vector< unsigned int > &passedIds, const std::vector< SequenceAny * > &passedOutputs, const std::vector< std::string > &nameOfOutputs)
void checkLevel2(const std::vector< AnyInputPort * > &ports) const
std::vector< unsigned int > _passedIds
int toAbsId(int localId) const
void assignAlreadyDone(const std::vector< SequenceAny * > &execVals) const
void accept(Visitor *visitor)
Node * simpleClone(ComposedNode *father, bool editionOnly=true) const
bool exIsReady() const
Definition: InGate.cxx:126
friend class InterceptorInputPort
Definition: InPort.hxx:61
bool canSafelySqueezeMemory() const
Definition: InPort.cxx:73
Base class for Input Ports.
Definition: InputPort.hxx:44
virtual void exInit(bool start)
Definition: InputPort.cxx:63
virtual void releaseData()=0
InputPort * clone(Node *newHelder) const
Definition: ForEachLoop.cxx:78
void getAllRepresentants(std::set< InPort * > &repr) const
Definition: ForEachLoop.cxx:71
void setRepr(AnySplitOutputPort *repr)
Definition: ForEachLoop.cxx:83
Base class for all nodes.
Definition: Node.hxx:70
std::string _name
Definition: Node.hxx:89
virtual void sendEvent2(const std::string &event, void *something)
emit notification to all observers registered with the dispatcher
Definition: Node.cxx:709
virtual void exForwardFinished()
Definition: Node.cxx:386
virtual void setErrorDetails(const std::string &error)
Definition: Node.hxx:191
friend class ForEachLoop
Definition: Node.hxx:78
ComposedNode * _father
Definition: Node.hxx:90
std::string getColorState(YACS::StatesForNode state) const
Return the color associated to a state.
Definition: Node.cxx:578
InGate _inGate
Definition: Node.hxx:86
virtual void exUpdateState()
Update the node state.
Definition: Node.cxx:206
const std::string getId() const
Definition: Node.cxx:478
virtual void init(bool start=true)
Definition: Node.cxx:102
ComposedNode * getFather() const
Definition: Node.hxx:127
virtual void writeDot(std::ostream &os) const
Dump to the input stream a dot representation of the node.
Definition: Node.cxx:611
void setState(YACS::StatesForNode theState)
Sets the given state for node.
Definition: Node.cxx:652
virtual YACS::StatesForNode getState() const
Definition: Node.hxx:118
virtual void sendEvent(const std::string &event)
emit notification to all observers registered with the dispatcher
Definition: Node.cxx:691
static const char SEP_CHAR_IN_PORT[]
Definition: Node.hxx:94
const std::string & getName() const
Definition: Node.hxx:125
virtual void exForwardFailed()
Definition: Node.cxx:378
virtual YACS::StatesForNode getEffectiveState() const
Return the node state in the context of its father.
Definition: Node.cxx:538
YACS::StatesForNode _state
Definition: Node.hxx:91
virtual int removeInPort(InPort *inPort, bool forward)=0
virtual bool addInPort(InPort *inPort)=0
virtual void getAllRepresented(std::set< OutPort * > &represented) const
Definition: OutPort.cxx:45
bool addInPort(InPort *inPort)
Definition: OutputPort.cxx:205
int removeInPort(InPort *inPort, bool forward)
Definition: OutputPort.cxx:228
std::set< InputPort * > _setOfInputPort
Definition: OutputPort.hxx:77
std::set< InPort * > edSetInPort() const
Definition: OutputPort.cxx:239
static const char NAME[]
Definition: OutputPort.hxx:79
Base class for all ports.
Definition: Port.hxx:43
Node * getNode() const
Definition: Port.hxx:46
virtual std::string dump()
InputPort * clone(Node *newHelder) const
Any * getValueAtRank(int i) const
SeqAnyInputPort(const std::string &name, Node *node, TypeCodeSeq *type)
unsigned getNumberOfElements() const
void setEltAtRank(int i, const Any *elem)
Definition: Any.cxx:696
unsigned int size() const
Definition: Any.hxx:209
static SequenceAny * New(const std::vector< T > &vec)
Definition: Any.hxx:318
void putSplittedValueOnRankTo(int rankInSeq, int branch, bool first)
InputPort * getInputPort(const std::string &name) const
SplitterNode(const std::string &name, TypeCode *typeOfData, ForEachLoopGen *father)
SeqAnyInputPort _dataPortToDispatch
void init(bool start=true)
unsigned getNumberOfElements() const
Node * simpleClone(ComposedNode *father, bool editionOnly) const
static const char NAME_OF_SEQUENCE_INPUT[]
Definition: ForEachLoop.hxx:93
Class for sequence objects.
Definition: TypeCode.hxx:160
Base class for all type objects.
Definition: TypeCode.hxx:68
virtual const TypeCode * contentType() const
Definition: TypeCode.cxx:174
static TypeCode * sequenceTc(const char *id, const char *name, TypeCode *content)
static factory of sequence type given an id, a name and a content type
Definition: TypeCode.cxx:254
const TypeCode * subContentType(int lev) const
Definition: TypeCode.cxx:206
virtual void visitForEachLoopDyn(ForEachLoopDyn *node)=0
virtual void visitForEachLoop(ForEachLoop *node)=0
Proc * p
Definition: driver.cxx:216
YACSLIBENGINE_EXPORT std::string ToBase64(const std::string &bytes)
Definition: Any.cxx:85
Event
Definition: define.hxx:56
@ ABORT
Definition: define.hxx:60
@ NOEVENT
Definition: define.hxx:57
@ FINISH
Definition: define.hxx:59
StatesForNode
Definition: define.hxx:34
@ FAILED
Definition: define.hxx:51
@ ACTIVATED
Definition: define.hxx:41
@ DONE
Definition: define.hxx:43
@ TOACTIVATE
Definition: define.hxx:40
@ DISABLED
Definition: define.hxx:50
@ ERROR
Definition: define.hxx:52