Version: 8.3.0
ForEachLoop.cxx
Go to the documentation of this file.
1 // Copyright (C) 2006-2016 CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19 
20 #include "ForEachLoop.hxx"
21 #include "TypeCode.hxx"
22 #include "Visitor.hxx"
23 #include "ComposedNode.hxx"
24 #include "Executor.hxx"
25 #include "AutoLocker.hxx"
26 
27 #include <iostream>
28 #include <iomanip>
29 #include <sstream>
30 #include <algorithm> // std::replace_if
31 
32 //#define _DEVDEBUG_
33 #include "YacsTrace.hxx"
34 
35 using namespace YACS::ENGINE;
36 using namespace std;
37 
44 const char FakeNodeForForEachLoop::NAME[]="thisIsAFakeNode";
45 
46 const char SplitterNode::NAME_OF_SEQUENCE_INPUT[]="SmplsCollection";
47 
48 const char ForEachLoop::NAME_OF_SPLITTERNODE[]="splitter";
49 
51 
52 const char ForEachLoop::INTERCEPTOR_STR[]="_interceptor";
53 
54 InterceptorInputPort::InterceptorInputPort(const std::string& name, Node *node, TypeCode* type):AnyInputPort(name,node,type),
55  DataPort(name,node,type),Port(node),
56  _repr(0)
57 {
58 }
59 
60 InterceptorInputPort::InterceptorInputPort(const InterceptorInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),
61  Port(other,newHelder),
62  _repr(0)
63 {
64 }
65 
66 void InterceptorInputPort::getAllRepresentants(std::set<InPort *>& repr) const
67 {
68  set<InPort *> ports=_repr->edSetInPort();
69  for(set<InPort *>::iterator iter=ports.begin();iter!=ports.end();iter++)
70  (*iter)->getAllRepresentants(repr);
71 }
72 
74 {
75  return new InterceptorInputPort(*this,newHelder);
76 }
77 
79 {
80  _repr=repr;
81 }
82 
84 {
85  return (--_cnt==0);
86 }
87 
89 {
90  _cnt++;
91 }
92 
93 AnySplitOutputPort::AnySplitOutputPort(const std::string& name, Node *node, TypeCode *type):OutputPort(name,node,type),
94  DataPort(name,node,type),Port(node),
95  _repr(0),_intercptr(0),_cnt(1)
96 {
97 }
98 
99 AnySplitOutputPort::AnySplitOutputPort(const AnySplitOutputPort& other, Node *newHelder):OutputPort(other,newHelder),
100  DataPort(other,newHelder),
101  Port(other,newHelder),
102  _repr(0),_intercptr(0),_cnt(1)
103 {
104 }
105 
107 {
108  bool ret=OutputPort::addInPort(inPort);
109  if(_repr)
110  _repr->addInPort(_intercptr);
111  return ret;
112 }
113 
114 void AnySplitOutputPort::getAllRepresented(std::set<OutPort *>& represented) const
115 {
116  if(!_repr)
117  OutPort::getAllRepresented(represented);
118  else
119  _repr->getAllRepresented(represented);
120 }
121 
122 int AnySplitOutputPort::removeInPort(InPort *inPort, bool forward) throw(YACS::Exception)
123 {
124  bool ret=OutputPort::removeInPort(inPort,forward);
125  if(_repr)
126  if(_setOfInputPort.empty())
127  _repr->removeInPort(_intercptr,forward);
128  return ret;
129 }
130 
132 {
133  _repr=repr;
134  _intercptr=intercptr;
135 }
136 
138 {
139  return new AnySplitOutputPort(*this,newHelder);
140 }
141 
142 SeqAnyInputPort::SeqAnyInputPort(const std::string& name, Node *node, TypeCodeSeq* type):AnyInputPort(name,node,type),DataPort(name,node,type),Port(node)
143 {
144  _type->decrRef();
145 }
146 
147 SeqAnyInputPort::SeqAnyInputPort(const SeqAnyInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),Port(other,newHelder)
148 {
149 }
150 
152 {
153  return new SeqAnyInputPort(*this,newHelder);
154 }
155 
157 {
158  const SequenceAny * valCsted=(const SequenceAny *) _value;
159  if (valCsted) return valCsted->size();
160  return 0;
161 }
162 
164 {
165  const SequenceAny * valCsted=(const SequenceAny *) _value;
166  AnyPtr ret=(*valCsted)[i];
167  ret->incrRef();
168  return ret;
169 }
170 
172 {
173  stringstream xmldump;
174  int nbElem = getNumberOfElements();
175  xmldump << "<value><array><data>" << endl;
176  for (int i = 0; i < nbElem; i++)
177  {
178  Any *val = getValueAtRank(i);
179  switch (val->getType()->kind())
180  {
181  case Double:
182  xmldump << "<value><double>" << setprecision(16) << val->getDoubleValue() << "</double></value>" << endl;
183  break;
184  case Int:
185  xmldump << "<value><int>" << val->getIntValue() << "</int></value>" << endl;
186  break;
187  case Bool:
188  xmldump << "<value><boolean>" << val->getBoolValue() << "</boolean></value>" << endl;
189  break;
190  case String:
191  xmldump << "<value><string>" << val->getStringValue() << "</string></value>" << endl;
192  break;
193  case Objref:
194  xmldump << "<value><objref>" << val->getStringValue() << "</objref></value>" << endl;
195  break;
196  default:
197  xmldump << "<value><error> NO_SERIALISATION_AVAILABLE </error></value>" << endl;
198  break;
199  }
200  }
201  xmldump << "</data></array></value>" << endl;
202  return xmldump.str();
203 }
204 
205 SplitterNode::SplitterNode(const std::string& name, TypeCode *typeOfData,
206  ForEachLoop *father):ElementaryNode(name),
207  _dataPortToDispatch(NAME_OF_SEQUENCE_INPUT,
208  this,(TypeCodeSeq *)TypeCode::sequenceTc("","",typeOfData))
209 {
210  _father=father;
211 }
212 
214  _dataPortToDispatch(other._dataPortToDispatch,this)
215 {
216 }
217 
218 InputPort *SplitterNode::getInputPort(const std::string& name) const throw(YACS::Exception)
219 {
220  if(name==NAME_OF_SEQUENCE_INPUT)
221  return (InputPort *)&_dataPortToDispatch;
222  else
223  return ElementaryNode::getInputPort(name);
224 }
225 
226 Node *SplitterNode::simpleClone(ComposedNode *father, bool editionOnly) const
227 {
228  return new SplitterNode(*this,(ForEachLoop *)father);
229 }
230 
232 {
234 }
235 
237 {
238  //Nothing : should never been called elsewhere big problem...
239 }
240 
241 void SplitterNode::init(bool start)
242 {
243  ElementaryNode::init(start);
245 }
246 
247 void SplitterNode::putSplittedValueOnRankTo(int rankInSeq, int branch, bool first)
248 {
249  Any *valueToDispatch=_dataPortToDispatch.getValueAtRank(rankInSeq);
250  ForEachLoop *fatherTyped=(ForEachLoop *)_father;
251  fatherTyped->putValueOnBranch(valueToDispatch,branch,first);
252  valueToDispatch->decrRef();
253 }
254 
256  _loop(loop),
257  _normalFinish(normalFinish)
258 {
261 }
262 
264  _normalFinish(false)
265 {
266 }
267 
268 Node *FakeNodeForForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
269 {
270  return new FakeNodeForForEachLoop(*this);
271 }
272 
274 {
276 }
277 
279 {
281 }
282 
284 {
285  if(!_normalFinish)
286  throw Exception("");//only to trigger ABORT on Executor
287  else
289 }
290 
292 {
294 }
295 
297 {
299 }
300 
301 ForEachLoopPassedData::ForEachLoopPassedData(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs):_passedIds(passedIds),_passedOutputs(passedOutputs),_nameOfOutputs(nameOfOutputs)
302 {
303  std::size_t sz(_passedIds.size()),sz1(passedOutputs.size()),sz2(nameOfOutputs.size());
304  if(sz1!=sz2)
305  throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : nameOfOutputs and passedOutputs must have the same size !");
306  for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
307  {
308  const SequenceAny *elt(*it);
309  if(elt)
310  if(sz!=(std::size_t)elt->size())
311  throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : incoherent input of passed data !");
312  }
313  for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
314  {
315  SequenceAny *elt(*it);
316  if(elt)
317  elt->incrRef();
318  }
319 }
320 
322 : _passedIds(copy._passedIds),
323  _passedOutputs(copy._passedOutputs),
324  _nameOfOutputs(copy._nameOfOutputs),
325  _flagsIds(copy._flagsIds)
326 {
327 }
328 
330 {
331  for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
332  {
333  SequenceAny *elt(*it);
334  if(elt)
335  elt->decrRef();
336  }
337 }
338 
340 {
341  _flagsIds.clear();
342 }
343 
345 {
346  if(nbOfElts<0)
347  throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : nb of elts is expected to be > 0 !");
348  std::size_t sizeExp(_passedIds.size()),nbOfElts2(nbOfElts);
349  if(nbOfElts2<sizeExp)
350  throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set !");
351  for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
352  {
353  if((*it)>=nbOfElts2)
354  throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set 2 !");
355  }
356  _flagsIds.resize(nbOfElts);
357  std::fill(_flagsIds.begin(),_flagsIds.end(),false);
358  for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
359  {
360  if(*it<nbOfElts)
361  {
362  if(!_flagsIds[*it])
363  _flagsIds[*it]=true;
364  else
365  {
366  std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : id " << *it << " in list of ids appears more than once !";
367  throw YACS::Exception(oss.str());
368  }
369  }
370  else
371  {
372  std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : Presence of id " << *it << " in list of ids ! Must be in [0," << nbOfElts << ") !";
373  throw YACS::Exception(oss.str());
374  }
375  }
376 }
377 
378 void ForEachLoopPassedData::checkLevel2(const std::vector<AnyInputPort *>& ports) const
379 {
380  std::size_t sz(_nameOfOutputs.size());
381  if(sz!=ports.size())
382  throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : mismatch of size of vectors !");
383  for(std::size_t i=0;i<sz;i++)
384  {
385  AnyInputPort *elt(ports[i]);
386  if(!elt)
387  throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : presence of null instance !");
388  if(_nameOfOutputs[i]!=elt->getName())
389  {
390  std::ostringstream oss; oss << "ForEachLoopPassedData::checkLevel2 : At pos #" << i << " the name is not OK !";
391  throw YACS::Exception(oss.str());
392  }
393  }
394 }
395 
399 int ForEachLoopPassedData::toAbsId(int localId) const
400 {
401  if(localId<0)
402  throw YACS::Exception("ForEachLoopPassedData::toAbsId : local pos must be >= 0 !");
403  int ret(0),curLocId(0);
404  for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
405  {
406  if(!*it)
407  {
408  if(localId==curLocId)
409  return ret;
410  curLocId++;
411  }
412  }
413  throw YACS::Exception("ForEachLoopPassedData::toAbsId : not referenced Id !");
414 }
415 
419 int ForEachLoopPassedData::toAbsIdNot(int localId) const
420 {
421  if(localId<0)
422  throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : local pos must be >= 0 !");
423  int ret(0),curLocId(0);
424  for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
425  {
426  if(*it)//<- diff is here !
427  {
428  if(localId==curLocId)
429  return ret;
430  curLocId++;
431  }
432  }
433  throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : not referenced Id !");
434 }
435 
437 {
438  std::size_t nbAllElts(_flagsIds.size());
439  std::size_t ret(nbAllElts-_passedIds.size());
440  return ret;
441 }
442 
443 void ForEachLoopPassedData::assignAlreadyDone(const std::vector<SequenceAny *>& execVals) const
444 {
445  std::size_t sz(execVals.size());
446  if(_passedOutputs.size()!=sz)
447  throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : mismatch of size of vectors !");
448  for(std::size_t i=0;i<sz;i++)
449  {
451  SequenceAny *eltDestination(execVals[i]);
452  if(!elt)
453  throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : presence of null elt !");
454  unsigned int szOfElt(elt->size());
455  for(unsigned int j=0;j<szOfElt;j++)
456  {
457  AnyPtr elt1((*elt)[j]);
458  int jAbs(toAbsIdNot(j));
459  eltDestination->setEltAtRank(jAbs,elt1);
460  }
461  }
462 }
463 
464 ForEachLoop::ForEachLoop(const std::string& name, TypeCode *typeOfDataSplitted):DynParaLoop(name,typeOfDataSplitted),
465  _splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
466  _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
467 {
468 }
469 
470 ForEachLoop::ForEachLoop(const ForEachLoop& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
471  _splitterNode(other._splitterNode,this),
472  _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
473 {
474  int i=0;
475  if(!editionOnly)
476  for(vector<AnySplitOutputPort *>::const_iterator iter2=other._outGoingPorts.begin();iter2!=other._outGoingPorts.end();iter2++,i++)
477  {
478  AnySplitOutputPort *temp=new AnySplitOutputPort(*(*iter2),this);
480  temp->addRepr(getOutPort(other.getOutPortName((*iter2)->getRepr())),interc);
481  interc->setRepr(temp);
482  _outGoingPorts.push_back(temp);
483  _intecptrsForOutGoingPorts.push_back(interc);
484  }
485 }
486 
487 Node *ForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
488 {
489  return new ForEachLoop(*this,father,editionOnly);
490 }
491 
493 {
494  cleanDynGraph();
495  for(vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
496  delete *iter;
497  for(vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();iter2!=_intecptrsForOutGoingPorts.end();iter2++)
498  delete *iter2;
499  delete _passedData;
500 }
501 
502 void ForEachLoop::init(bool start)
503 {
504  DynParaLoop::init(start);
505  _splitterNode.init(start);
506  _execCurrentId=0;
507  cleanDynGraph();
508  _currentIndex = 0;
510  if(_passedData)
511  _passedData->init();
512 }
513 
515 {
516  DEBTRACE("ForEachLoop::exUpdateState");
517  if(_state == YACS::DISABLED)
518  return;
519  if(_state == YACS::DONE)
520  return;
521  if(_inGate.exIsReady())
522  {
523  //internal graph update
524  int i;
525  int nbOfBr(_nbOfBranches.getIntValue()),nbOfElts(_splitterNode.getNumberOfElements()),nbOfEltsDone(0);
526  if(_passedData)
527  {
529  nbOfEltsDone=_passedData->getNumberOfEltsAlreadyDone();
530  }
531  int nbOfEltsToDo(nbOfElts-nbOfEltsDone);
532 
533  DEBTRACE("nbOfElts=" << nbOfElts);
534  DEBTRACE("nbOfBr=" << nbOfBr);
535 
536  if(nbOfEltsToDo==0)
537  {
539  delete _nodeForSpecialCases;
542  return ;
543  }
544  if(nbOfBr<=0)
545  {
546  delete _nodeForSpecialCases;
549  return ;
550  }
551  if(nbOfBr>nbOfEltsToDo)
552  nbOfBr=nbOfEltsToDo;
553  _execNodes.resize(nbOfBr);
554  _execIds.resize(nbOfBr);
555  _execOutGoingPorts.resize(nbOfBr);
556  prepareSequenceValues(nbOfElts);
557  if(_initNode)
558  _execInitNodes.resize(nbOfBr);
560  if (_finalizeNode)
561  _execFinalizeNodes.resize(nbOfBr);
562 
563  vector<Node *> origNodes;
564  origNodes.push_back(_initNode);
565  origNodes.push_back(_node);
566  origNodes.push_back(_finalizeNode);
567 
568  //Conversion exceptions can be thrown by createOutputOutOfScopeInterceptors
569  //so catch them to control errors
570  try
571  {
572  for(i=0;i<nbOfBr;i++)
573  {
574  DEBTRACE( "-------------- 2" );
575  vector<Node *> clonedNodes = cloneAndPlaceNodesCoherently(origNodes);
576  if(_initNode)
577  _execInitNodes[i] = clonedNodes[0];
578  _execNodes[i] = clonedNodes[1];
579  if(_finalizeNode)
580  _execFinalizeNodes[i] = clonedNodes[2];
581  DEBTRACE( "-------------- 4" );
583  DEBTRACE( "-------------- 5" );
585  DEBTRACE( "-------------- 6" );
586  }
587  for(i=0;i<nbOfBr;i++)
588  {
589  DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
591  int posInAbs(_execCurrentId);
592  if(_passedData)
594  _splitterNode.putSplittedValueOnRankTo(posInAbs,i,true);
595  _execCurrentId++;
596  DEBTRACE( "-------------- 7" );
597  }
598  if(_passedData)
599  {
602  }
603  }
604  catch(YACS::Exception& ex)
605  {
606  //ForEachLoop must be put in error and the exception rethrown to notify the caller
607  DEBTRACE( "ForEachLoop::exUpdateState: " << ex.what() );
609  exForwardFailed();
610  throw;
611  }
612 
613  setState(YACS::ACTIVATED); // move the calling of setState method there for adding observers for clone nodes in GUI part
614 
615  //let's go
616  for(i=0;i<nbOfBr;i++)
617  if(_initNode)
618  {
619  _execInitNodes[i]->exUpdateState();
621  }
622  else
623  {
625  _execNodes[i]->exUpdateState();
626  }
627 
629  }
630 }
631 
633 {
634  // emit notification to all observers registered with the dispatcher on any change of the node's state
635  sendEvent("progress");
636 }
637 
638 void ForEachLoop::getReadyTasks(std::vector<Task *>& tasks)
639 {
640  if(!_node)
641  return;
644  {
646  {
648  return ;
649  }
650  vector<Node *>::iterator iter;
651  for (iter=_execNodes.begin() ; iter!=_execNodes.end() ; iter++)
652  (*iter)->getReadyTasks(tasks);
653  for (iter=_execInitNodes.begin() ; iter!=_execInitNodes.end() ; iter++)
654  (*iter)->getReadyTasks(tasks);
655  for (iter=_execFinalizeNodes.begin() ; iter!=_execFinalizeNodes.end() ; iter++)
656  (*iter)->getReadyTasks(tasks);
657  }
658 }
659 
661 {
663 }
664 
666 {
667  //TO DO
668 }
669 
670 void ForEachLoop::selectRunnableTasks(std::vector<Task *>& tasks)
671 {
672 }
673 
674 std::list<InputPort *> ForEachLoop::getSetOfInputPort() const
675 {
676  list<InputPort *> ret=DynParaLoop::getSetOfInputPort();
677  ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
678  return ret;
679 }
680 
681 std::list<InputPort *> ForEachLoop::getLocalInputPorts() const
682 {
683  list<InputPort *> ret=DynParaLoop::getLocalInputPorts();
684  ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
685  return ret;
686 }
687 
688 InputPort *ForEachLoop::getInputPort(const std::string& name) const throw(YACS::Exception)
689 {
691  return (InputPort *)&_splitterNode._dataPortToDispatch;
692  else
693  return DynParaLoop::getInputPort(name);
694 }
695 
696 OutputPort *ForEachLoop::getOutputPort(const std::string& name) const throw(YACS::Exception)
697 {
698  for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
699  {
700  if(name==(*iter)->getName())
701  return (OutputPort *)(*iter);
702  }
703  return DynParaLoop::getOutputPort(name);
704 }
705 
706 OutPort *ForEachLoop::getOutPort(const std::string& name) const throw(YACS::Exception)
707 {
708  for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
709  {
710  if(name==(*iter)->getName())
711  return (OutPort *)(*iter);
712  }
713  return DynParaLoop::getOutPort(name);
714 }
715 
716 Node *ForEachLoop::getChildByShortName(const std::string& name) const throw(YACS::Exception)
717 {
718  if(name==NAME_OF_SPLITTERNODE)
719  return (Node *)&_splitterNode;
720  else
722 }
723 
725 
732 {
733  DEBTRACE("updateStateOnFinishedEventFrom " << node->getName() << " " << node->getState());
734  unsigned int id;
735  switch(getIdentityOfNotifyerNode(node,id))
736  {
737  case INIT_NODE:
739  case WORK_NODE:
740  return updateStateForWorkNodeOnFinishedEventFrom(node,id,true);
741  case FINALIZE_NODE:
743  default:
744  YASSERT(false);
745  }
746  return YACS::NOEVENT;
747 }
748 
750 {
751  _execNodes[id]->exUpdateState();
754  _currentIndex++;
755  if (_initializingCounter == 0)
757  return YACS::NOEVENT;
758 }
759 
763 YACS::Event ForEachLoop::updateStateForWorkNodeOnFinishedEventFrom(Node *node, unsigned int id, bool isNormalFinish)
764 {
765  _currentIndex++;
767  if(isNormalFinish)
768  {
769  int globalId(_execIds[id]);
770  if(_passedData)
771  globalId=_passedData->toAbsId(globalId);
772  sendEvent2("progress_ok",&globalId);
774  }
775  else
776  {
777  int globalId(_execIds[id]);
778  if(_passedData)
779  globalId=_passedData->toAbsId(globalId);
780  sendEvent2("progress_ko",&globalId);
781  }
782  //
784  {//No more elements of _dataPortToDispatch to treat
786  //analyzing if some samples are still on treatment on other branches.
787  bool isFinished(true);
788  for(int i=0;i<_execIds.size() && isFinished;i++)
789  isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
790  if(isFinished)
791  {
792  try
793  {
794  if(_failedCounter!=0)
795  {// case of keepgoing mode + a failed
796  std::ostringstream oss; oss << "Keep Going mode activated and some errors (" << _failedCounter << ")reported !";
797  DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom : "<< oss.str());
799  return YACS::ABORT;
800  }
802 
803  if (_node)
804  {
806 
807  ComposedNode* compNode = dynamic_cast<ComposedNode*>(_node);
808  if (compNode)
809  {
810  std::list<Node *> aChldn = compNode->getAllRecursiveConstituents();
811  std::list<Node *>::iterator iter=aChldn.begin();
812  for(;iter!=aChldn.end();iter++)
813  (*iter)->setState(YACS::DONE);
814  }
815  }
816 
817  if (_finalizeNode == NULL)
818  {
819  // No finalize node, we just finish the loop at the end of exec nodes execution
821  return YACS::FINISH;
822  }
823  else
824  {
825  // Run the finalize nodes, the loop will be done only when they all finish
826  _unfinishedCounter = 0; // This counter indicates how many branches are not finished
827  for (int i=0 ; i<_execIds.size() ; i++)
828  {
830  DEBTRACE("Launching finalize node for branch " << i);
831  _execFinalizeNodes[i]->exUpdateState();
833  }
834  return YACS::NOEVENT;
835  }
836  }
837  catch(YACS::Exception& ex)
838  {
839  DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<<ex.what());
840  //no way to push results : put following nodes in FAILED state
841  //TODO could be more fine grain : put only concerned nodes in FAILED state
842  exForwardFailed();
844  return YACS::ABORT;
845  }
846  }
847  }
848  else if(_state == YACS::ACTIVATED)
849  {//more elements to do and loop still activated
851  node->init(false);
852  int posInAbs(_execCurrentId);
853  if(_passedData)
855  _splitterNode.putSplittedValueOnRankTo(posInAbs,id,false);
856  _execCurrentId++;
857  node->exUpdateState();
860  }
861  else
862  {//elements to process and loop no more activated
863  DEBTRACE("foreach loop state " << _state);
864  }
865  return YACS::NOEVENT;
866 }
867 
869 {
870  DEBTRACE("Finalize node finished on branch " << id);
872  _currentIndex++;
874  DEBTRACE(_unfinishedCounter << " finalize nodes still running");
875  if (_unfinishedCounter == 0)
876  {
879  return YACS::FINISH;
880  }
881  else
882  return YACS::NOEVENT;
883 }
884 
886 {
887  unsigned int id;
889  // TODO: deal with keepgoing without the dependency to Executor
890  if(ton!=WORK_NODE || !execInst->getKeepGoingProperty())
891  return DynParaLoop::updateStateOnFailedEventFrom(node,execInst);
892  else
893  {
894  _failedCounter++;
895  return updateStateForWorkNodeOnFinishedEventFrom(node,id,false);
896  }
897 }
898 
899 void ForEachLoop::InterceptorizeNameOfPort(std::string& portName)
900 {
901  std::replace_if(portName.begin(), portName.end(), std::bind1st(std::equal_to<char>(), '.'), '_');
902  portName += INTERCEPTOR_STR;
903 }
904 
905 void ForEachLoop::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
906 {
907  DynParaLoop::buildDelegateOf(port,finalTarget,pointsOfView);
908  string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
909  if(typeOfPortInstance==OutputPort::NAME)
910  {
911  vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
912  int i=0;
913  for(;iter!=_outGoingPorts.end();iter++,i++)
914  if((*iter)->getRepr()==port.first || *iter==port.first)
915  break;
916  if(iter!=_outGoingPorts.end())
917  {
918  if(*iter!=port.first)
919  {
920  (*iter)->incrRef();
921  (*iter)->addRepr(port.first,_intecptrsForOutGoingPorts[i]);
922  }
923  port.first=*iter;
924  }
925  else
926  {
927  TypeCode *tcTrad((YACS::ENGINE::TypeCode*)finalTarget->edGetType()->subContentType(getFEDeltaBetween(port.first,finalTarget)));
928  TypeCodeSeq *newTc=(TypeCodeSeq *)TypeCode::sequenceTc("","",tcTrad);
929  // The out going ports belong to the ForEachLoop, whereas
930  // the delegated port belongs to a node child of the ForEachLoop.
931  // The name of the delegated port contains dots (bloc.node.outport),
932  // whereas the name of the out going port shouldn't do.
933  std::string outputPortName(getPortName(port.first));
934  InterceptorizeNameOfPort(outputPortName);
935  AnySplitOutputPort *newPort(new AnySplitOutputPort(outputPortName,this,newTc));
936  InterceptorInputPort *intercptor(new InterceptorInputPort(outputPortName + "_in",this,tcTrad));
937  intercptor->setRepr(newPort);
938  newTc->decrRef();
939  newPort->addRepr(port.first,intercptor);
940  _outGoingPorts.push_back(newPort);
941  _intecptrsForOutGoingPorts.push_back(intercptor);
942  port.first=newPort;
943  }
944  }
945  else
946  throw Exception("ForEachLoop::buildDelegateOf : not implemented for DS because not specified");
947 }
948 
949 void ForEachLoop::getDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
950 {
951  string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
952  if(typeOfPortInstance==OutputPort::NAME)
953  {
954  vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
955  for(;iter!=_outGoingPorts.end();iter++)
956  if((*iter)->getRepr()==port.first)
957  break;
958  if(iter==_outGoingPorts.end())
959  {
960  string what("ForEachLoop::getDelegateOf : Port with name "); what+=port.first->getName(); what+=" not exported by ForEachLoop "; what+=_name;
961  throw Exception(what);
962  }
963  else
964  port.first=(*iter);
965  }
966  else
967  throw Exception("ForEachLoop::getDelegateOf : not implemented because not specified");
968 }
969 
970 void ForEachLoop::releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
971 {
972  string typeOfPortInstance=portDwn->getNameOfTypeOfCurrentInstance();
973  if(typeOfPortInstance==OutputPort::NAME)
974  {
975  vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
976  vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();
977  for(;iter!=_outGoingPorts.end();iter++,iter2++)
978  if((*iter)->getRepr()==portDwn)
979  break;
980  //ASSERT(portUp==*iter.second)
981  if((*iter)->decrRef())
982  {
983  AnySplitOutputPort *p=*iter;
984  _outGoingPorts.erase(iter);
985  delete p;
986  InterceptorInputPort *ip=*iter2;
987  _intecptrsForOutGoingPorts.erase(iter2);
988  delete ip;
989  }
990  }
991 }
992 
993 OutPort *ForEachLoop::getDynOutPortByAbsName(int branchNb, const std::string& name)
994 {
995  string portName, nodeName;
996  splitNamesBySep(name,Node::SEP_CHAR_IN_PORT,nodeName,portName,false);
997  Node *staticChild = getChildByName(nodeName);
998  return _execNodes[branchNb]->getOutPort(portName);//It's impossible(garanteed by YACS::ENGINE::ForEachLoop::buildDelegateOf)
999  //that a link starting from _initNode goes out of scope of 'this'.
1000 }
1001 
1003 {
1005  for(vector< SequenceAny *>::iterator iter3=_execVals.begin();iter3!=_execVals.end();iter3++)
1006  (*iter3)->decrRef();
1007  _execVals.clear();
1008  for(vector< vector<AnyInputPort *> >::iterator iter4=_execOutGoingPorts.begin();iter4!=_execOutGoingPorts.end();iter4++)
1009  for(vector<AnyInputPort *>::iterator iter5=(*iter4).begin();iter5!=(*iter4).end();iter5++)
1010  delete *iter5;
1011  _execOutGoingPorts.clear();
1012 }
1013 
1015 {
1016  vector<AnyInputPort *>::iterator iter;
1017  int i=0;
1018  for(iter=_execOutGoingPorts[branchNb].begin();iter!=_execOutGoingPorts[branchNb].end();iter++,i++)
1019  {
1020  Any *val=(Any *)(*iter)->getValue();
1021  _execVals[i]->setEltAtRank(rank,val);
1022  }
1023 }
1024 
1026 {
1027  if(!_passedData)
1029  else
1031 }
1032 
1033 void ForEachLoop::prepareSequenceValues(int sizeOfSamples)
1034 {
1035  _execVals.resize(_outGoingPorts.size());
1036  vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1037  for(int i=0;iter!=_outGoingPorts.end();iter++,i++)
1038  _execVals[i]=SequenceAny::New((*iter)->edGetType()->contentType(),sizeOfSamples);
1039 }
1040 
1042 {
1043  vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1044  int i=0;
1045  for(;iter!=_outGoingPorts.end();iter++,i++)
1046  (*iter)->put((const void *)_execVals[i]);
1047 }
1048 
1050 {
1051  vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1052  int i=0;
1053  for(;iter!=_outGoingPorts.end();iter++,i++)
1054  {
1055  DEBTRACE( (*iter)->getName() << " " << (*iter)->edGetType()->kind() );
1056  //AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,(*iter)->edGetType());
1057  OutPort *portOut=getDynOutPortByAbsName(branchNb,getOutPortName(((*iter)->getRepr())));
1058  DEBTRACE( portOut->getName() );
1059  TypeCode *tc((TypeCode *)(*iter)->edGetType()->contentType());
1060  AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,tc);
1061  portOut->addInPort(interceptor);
1062  _execOutGoingPorts[branchNb].push_back(interceptor);
1063  }
1064 }
1065 
1066 void ForEachLoop::checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
1067  InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd) throw(YACS::Exception)
1068 {
1069  DynParaLoop::checkLinkPossibility(start, pointsOfViewStart, end, pointsOfViewEnd);
1070  if(end->getNode() == &_splitterNode)
1071  throw Exception("Illegal link within a foreach loop: \
1072 the 'SmplsCollection' port cannot be linked within the scope of the loop.");
1073  if(end == &_nbOfBranches)
1074  throw Exception("Illegal link within a foreach loop: \
1075 the 'nbBranches' port cannot be linked within the scope of the loop.");
1076 }
1077 
1078 std::list<OutputPort *> ForEachLoop::getLocalOutputPorts() const
1079 {
1080  list<OutputPort *> ret;
1081  ret.push_back(getOutputPort(NAME_OF_SPLITTED_SEQ_OUT));
1082  return ret;
1083 }
1084 
1086 {
1087  visitor->visitForEachLoop(this);
1088 }
1089 
1091 
1094 void ForEachLoop::writeDot(std::ostream &os) const
1095 {
1096  os << " subgraph cluster_" << getId() << " {\n" ;
1097  //only one node in a loop
1098  if(_node)
1099  {
1100  _node->writeDot(os);
1101  os << getId() << " -> " << _node->getId() << ";\n";
1102  }
1103  os << "}\n" ;
1104  os << getId() << "[fillcolor=\"" ;
1106  os << getColorState(state);
1107  os << "\" label=\"" << "Loop:" ;
1108  os << getName() <<"\"];\n";
1109 }
1110 
1113 {
1114  if(level==0)return;
1115  DynParaLoop::resetState(level);
1116  _execCurrentId=0;
1117  //Note: cleanDynGraph is not a virtual method (must be called from ForEachLoop object)
1118  cleanDynGraph();
1119 }
1120 
1121 std::string ForEachLoop::getProgress() const
1122 {
1123  int nbElems(getNbOfElementsToBeProcessed());
1124  std::stringstream aProgress;
1125  if (nbElems > 0)
1126  aProgress << _currentIndex << "/" << nbElems;
1127  else
1128  aProgress << "0";
1129  return aProgress.str();
1130 }
1131 
1133 
1137 list<ProgressWeight> ForEachLoop::getProgressWeight() const
1138 {
1139  list<ProgressWeight> ret;
1140  list<Node *> setOfNode=edGetDirectDescendants();
1141  int elemDone=getCurrentIndex();
1142  int elemTotal=getNbOfElementsToBeProcessed();
1143  for(list<Node *>::const_iterator iter=setOfNode.begin();iter!=setOfNode.end();iter++)
1144  {
1145  list<ProgressWeight> myCurrentSet=(*iter)->getProgressWeight();
1146  for(list<ProgressWeight>::iterator iter=myCurrentSet.begin();iter!=myCurrentSet.end();iter++)
1147  {
1148  (*iter).weightDone=((*iter).weightTotal) * elemDone;
1149  (*iter).weightTotal*=elemTotal;
1150  }
1151  ret.insert(ret.end(),myCurrentSet.begin(),myCurrentSet.end());
1152  }
1153  return ret;
1154 }
1155 
1157 {
1158  int nbBranches = _nbOfBranches.getIntValue();
1160  + (_initNode ? nbBranches:0)
1161  + (_finalizeNode ? nbBranches:0) ;
1162 }
1163 
1178 std::vector<unsigned int> ForEachLoop::getPassedResults(Executor *execut, std::vector<SequenceAny *>& outputs, std::vector<std::string>& nameOfOutputs) const
1179 {
1181  if(_execVals.empty())
1182  return std::vector<unsigned int>();
1183  if(_execOutGoingPorts.empty())
1184  return std::vector<unsigned int>();
1185  std::size_t sz(_execVals.size());
1186  outputs.resize(sz);
1187  nameOfOutputs.resize(sz);
1188  const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
1189  for(std::size_t i=0;i<sz;i++)
1190  {
1191  outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
1192  nameOfOutputs[i]=ports[i]->getName();
1193  }
1194  return _execVals[0]->getSetItems();
1195 }
1196 
1201 void ForEachLoop::assignPassedResults(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs)
1202 {
1203  delete _passedData;
1204  _failedCounter=0;
1205  _passedData=new ForEachLoopPassedData(passedIds,passedOutputs,nameOfOutputs);
1206 }
1207 
1209 {
1210  Node *ns(start->getNode()),*ne(end->getNode());
1212  int ret(0);
1213  Node *work(ns);
1214  while(work!=co)
1215  {
1216  ForEachLoop *isFE(dynamic_cast<ForEachLoop *>(work));
1217  if(isFE)
1218  ret++;
1219  work=work->getFather();
1220  }
1221  if(dynamic_cast<AnySplitOutputPort *>(start))
1222  ret--;
1223  return ret;
1224 }
1225 
1231 {
1232  std::vector<SequenceAny *> outputs;
1233  std::vector<std::string> nameOfOutputs;
1234  if(_execVals.empty() || _execOutGoingPorts.empty())
1235  return new ForEachLoopPassedData(std::vector<unsigned int>(), outputs, nameOfOutputs);
1236  std::size_t sz(_execVals.size());
1237  outputs.resize(sz);
1238  nameOfOutputs.resize(sz);
1239  const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
1240  for(std::size_t i=0;i<sz;i++)
1241  {
1242  outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
1243  nameOfOutputs[i]=ports[i]->getName();
1244  }
1245  return new ForEachLoopPassedData(_execVals[0]->getSetItems(), outputs, nameOfOutputs);
1246 }
1247 
1249 {
1250  if(_passedData)
1251  delete _passedData;
1252  _passedData = processedData;
1253 }
1254 
1258 const YACS::ENGINE::TypeCode* ForEachLoop::getOutputPortType(const std::string& portName)const
1259 {
1260  const YACS::ENGINE::TypeCode* ret=NULL;
1261  vector<AnySplitOutputPort *>::const_iterator it;
1262  for(it=_outGoingPorts.begin();it!=_outGoingPorts.end() && ret==NULL;it++)
1263  {
1264  std::string originalPortName(getPortName(*it));
1265  //InterceptorizeNameOfPort(originalPortName);
1266  DEBTRACE("ForEachLoop::getOutputPortType compare " << portName << " == " << originalPortName);
1267  if(originalPortName == portName)
1268  {
1269  ret = (*it)->edGetType()->contentType();
1270  }
1271  }
1272  return ret;
1273 }