Version: 8.3.0
Executor.cxx
Go to the documentation of this file.
1 // Copyright (C) 2006-2016 CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19 
20 #include "Executor.hxx"
21 #include "Task.hxx"
22 #include "AutoLocker.hxx"
23 #include "Scheduler.hxx"
24 #include "Dispatcher.hxx"
25 #include "Container.hxx"
27 #include "ComponentInstance.hxx"
28 
29 #include "VisitorSaveState.hxx"
30 #include "ServiceNode.hxx"
31 #include "ComposedNode.hxx"
32 
33 #include <iostream>
34 #include <fstream>
35 #include <sys/stat.h>
36 #ifndef WIN32
37 #include <sys/time.h>
38 #include <unistd.h>
39 #endif
40 
41 #include <cstdlib>
42 #include <algorithm>
43 
44 #ifdef WIN32
45 #define usleep(A) _sleep(A/1000)
46 #if !defined(S_ISCHR) || !defined(S_ISREG)
47 # ifndef S_IFMT
48 # ifdef _S_IFMT
49 # define S_IFMT _S_IFMT
50 # define S_IFCHR _S_IFCHR
51 # define S_IFREG _S_IFREG
52 # else
53 # ifdef __S_IFMT
54 # define S_IFMT __S_IFMT
55 # define S_IFCHR __S_IFCHR
56 # define S_IFREG __S_IFREG
57 # endif
58 # endif
59 # endif
60 # define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
61 # define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
62 #endif
63 #endif
64 
65 using namespace YACS::ENGINE;
66 using namespace std;
67 
68 using YACS::BASES::Mutex;
69 using YACS::BASES::Thread;
70 using YACS::BASES::Semaphore;
71 
72 //#define _DEVDEBUG_
73 #include "YacsTrace.hxx"
74 
75 int Executor::_maxThreads(1000);
76 size_t Executor::_threadStackSize(262144); // Default thread stack size is 256 kB == 2**18 because threads launched by YACS are lightweight
77 
78 Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
79 {
80  _root=0;
81  _toContinue = true;
82  _isOKToEnd = false;
83  _stopOnErrorRequested = false;
84  _dumpOnErrorRequested = false;
85  _errorDetected = false;
90  DEBTRACE("Executor initialized with max threads = " << _maxThreads);
91 }
92 
94 {
95  for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
96  delete *iter;
97 }
98 
100 
112 void Executor::RunA(Scheduler *graph,int debug, bool fromScratch)
113 {
114  DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
115  _mainSched=graph;
116  _root = dynamic_cast<ComposedNode *>(_mainSched);
117  if (!_root) throw Exception("Executor::Run, Internal Error!");
118  bool isMore;
119  int i=0;
120  if(debug>1)_displayDot(graph);
121  if (fromScratch)
122  {
123  graph->init();
124  graph->exUpdateState();
125  }
126  if(debug>1)_displayDot(graph);
127  vector<Task *> tasks;
128  vector<Task *>::iterator iter;
129  _toContinue=true;
133  _runningTasks.clear();
135  while(_toContinue)
136  {
138 
139  if(debug>2)_displayDot(graph);
140 
141  {//Critical section
143  tasks=graph->getNextTasks(isMore);
144  graph->selectRunnableTasks(tasks);
145  }//End of critical section
146 
147  if(debug>2)_displayDot(graph);
148 
149  for(iter=tasks.begin();iter!=tasks.end();iter++)
150  loadTask(*iter,this);
151 
152  if(debug>1)_displayDot(graph);
153 
154  launchTasks(tasks);
155 
156  if(debug>1)_displayDot(graph);
157 
158  {//Critical section
160  _toContinue=!graph->isFinished();
161  }//End of critical section
162  DEBTRACE("_toContinue: " << _toContinue);
163 
164  if(debug>0)_displayDot(graph);
165 
166  i++;
167  }
168 }
169 
171 
231 void Executor::RunB(Scheduler *graph,int debug, bool fromScratch)
232 {
233  DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
234 
235  { // --- Critical section
237  _mainSched = graph;
238  _root = dynamic_cast<ComposedNode *>(_mainSched);
239  if (!_root) throw Exception("Executor::Run, Internal Error!");
241  sendEvent("executor");
242  _toContinue=true;
243  _isOKToEnd = false;
244  _errorDetected = false;
247  _runningTasks.clear();
249  string tracefile = "traceExec_";
250  tracefile += _mainSched->getName();
251  _trace.open(tracefile.c_str());
252 #ifdef WIN32
253  _start = timeGetTime();
254 #else
255  gettimeofday(&_start, NULL);
256 #endif
257 
258  } // --- End of critical section
259 
260  if (debug > 1) _displayDot(graph);
261 
262  if (fromScratch)
263  {
264  try
265  {
266  graph->init();
267  graph->exUpdateState();
268  }
269  catch(Exception& ex)
270  {
271  DEBTRACE("exception: "<< (ex.what()));
273  sendEvent("executor");
274  throw;
275  }
276  }
278  sendEvent("executor");
279 
280  if (debug > 1) _displayDot(graph);
281 
282  vector<Task *>::iterator iter;
283  bool isMore;
284  int problemCount=0;
285  int numberAllTasks;
286 
288  sendEvent("executor");
289  while (_toContinue)
290  {
291  DEBTRACE("--- executor main loop");
293  DEBTRACE("--- events...");
294  if (debug > 2) _displayDot(graph);
295  { // --- Critical section
297  _tasks=graph->getNextTasks(isMore);
298  graph->selectRunnableTasks(_tasks);
300  numberAllTasks=_numberOfRunningTasks+_tasks.size();
301  } // --- End of critical section
302  if (debug > 2) _displayDot(graph);
304  {
305  if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
306  if (debug > 0) _displayDot(graph);
307  DEBTRACE("---");
308  //loadTasks(_tasks);
310  if (debug > 1) _displayDot(graph);
311  DEBTRACE("---");
313  DEBTRACE("---");
314  }
315  if (debug > 1) _displayDot(graph);
316  { // --- Critical section
317  DEBTRACE("---");
319  //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
320  if(_numberOfRunningTasks == 0)
321  _toContinue = !graph->isFinished();
322 
323  DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
324  DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
325  DEBTRACE("_toContinue: " << _toContinue);
326  if(_toContinue && numberAllTasks==0)
327  {
328  //Problem : no running tasks and no task to launch ??
329  problemCount++;
330  std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
331  //Pause to give a chance to interrupt
332  usleep(1000);
333  if(problemCount > 25)
334  {
335  // Too much problems encountered : stop execution
336  _toContinue=false;
337  }
338  }
339 
340  if (! _toContinue)
341  {
343  sendEvent("executor");
344  _condForPilot.notify_all();
345  }
346  } // --- End of critical section
347  if (debug > 0) _displayDot(graph);
348  DEBTRACE("_toContinue: " << _toContinue);
349  }
350 
351  DEBTRACE("End of main Loop");
352 
353  { // --- Critical section
355  if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
356  {
357  DEBTRACE("stop requested: End soon");
359  _toContinue = false;
360  sendEvent("executor");
361  }
362  } // --- End of critical section
364  {
366  }
367  {
369  _trace.close();
370  }
371  DEBTRACE("End of RunB thread");
372 }
373 
375 {
377  return _execMode;
378 }
379 
380 
382 {
384  return _executorState;
385 }
386 
387 
389 {
391  return _toContinue;
392 }
393 
395 
400 void Executor::setStopOnError(bool dumpRequested, std::string xmlFile)
401 {
402  { // --- Critical section
404  _dumpErrorFile=xmlFile;
406  _dumpOnErrorRequested = dumpRequested;
407  if (dumpRequested && xmlFile.empty())
408  throw YACS::Exception("dump on error requested and no filename given for dump");
409  DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
410  } // --- End of critical section
411 }
412 
414 
418 {
419  { // --- Critical section
421  _stopOnErrorRequested=false;
422  } // --- End of critical section
423 }
424 
426 
432 {
433  DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
434  { // --- Critical section
437  _execMode = mode;
438  } // --- End of critical section
439 }
440 
442 
450 {
451  DEBTRACE("Executor::resumeCurrentBreakPoint()");
452  bool ret = false;
453  //bool doDump = false;
454  { // --- Critical section
457  DEBTRACE("_executorState: " << _executorState);
458  switch (_executorState)
459  {
460  case YACS::WAITINGTASKS:
461  case YACS::PAUSED:
462  {
463  _condForStepByStep.notify_all();
465  sendEvent("executor");
466  ret = true;
467  //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
468  break;
469  }
470  case YACS::FINISHED:
471  case YACS::STOPPED:
472  {
473  //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
474  DEBTRACE("Graph Execution finished or stopped !");
475  break;
476  }
477  default :
478  {
479  // debug: no easy way to verify if main loop is acutally waiting on condition
480  }
481  }
482  DEBTRACE("---");
483  //if (doDump) saveState(_dumpErrorFile);
484  } // --- End of critical section
485  return ret;
486 }
487 
488 
490 
491 
492 void Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)
493 {
494  DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
495  { // --- Critical section
498  _listOfBreakPoints = listOfBreakPoints;
499  } // --- End of critical section
500 }
501 
502 
504 
508 std::list<std::string> Executor::getTasksToLoad()
509 {
510  DEBTRACE("Executor::getTasksToLoad()");
511  list<string> listOfNodesToLoad;
512  listOfNodesToLoad.clear();
513  { // --- Critical section
516  switch (_executorState)
517  {
518  case YACS::WAITINGTASKS:
519  case YACS::PAUSED:
520  {
521  listOfNodesToLoad = _listOfTasksToLoad;
522  break;
523  }
525  case YACS::INITIALISED:
526  case YACS::RUNNING:
527  case YACS::FINISHED:
528  case YACS::STOPPED:
529  default:
530  {
531  break;
532  }
533  }
534  } // --- End of critical section
535  return listOfNodesToLoad;
536 }
537 
538 
540 
546 bool Executor::setStepsToExecute(std::list<std::string> listToExecute)
547 {
548  DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
549  bool ret = true;
550  vector<Task *>::iterator iter;
551  vector<Task *> restrictedTasks;
552  { // --- Critical section
555  switch (_executorState)
556  {
557  case YACS::WAITINGTASKS:
558  case YACS::PAUSED:
559  {
560  for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
561  {
562  string readyNode = _mainSched->getTaskName(*iter);
563  if (find(listToExecute.begin(), listToExecute.end(), readyNode)
564  != listToExecute.end())
565  {
566  restrictedTasks.push_back(*iter);
567  DEBTRACE("node to execute " << readyNode);
568  }
569  }
570  _tasks.clear();
571  for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
572  {
573  _tasks.push_back(*iter);
574  }
575  break;
576  }
578  case YACS::INITIALISED:
579  case YACS::RUNNING:
580  case YACS::FINISHED:
581  case YACS::STOPPED:
582  default:
583  {
584  break;
585  }
586  }
587  } // --- End of critical section
588 
589  _tasks.clear();
590  for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
591  {
592  _tasks.push_back(*iter);
593  }
594  for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
595  {
596  string readyNode = _mainSched->getTaskName(*iter);
597  DEBTRACE("selected node to execute " << readyNode);
598  }
599 
600  return ret;
601 }
602 
604 
610 {
611  DEBTRACE("Executor::waitPause()" << _executorState);
612  { // --- Critical section
615  switch (_executorState)
616  {
617  default:
618  case YACS::STOPPED:
619  case YACS::FINISHED:
620  case YACS::WAITINGTASKS:
621  case YACS::PAUSED:
622  {
623  break;
624  }
626  case YACS::INITIALISED:
627  case YACS::RUNNING:
628  {
629  _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
630  break;
631  }
632  }
633  } // --- End of critical section
634  DEBTRACE("---");
635 }
636 
638 
640 {
642  //waitPause();
643  _isOKToEnd = true;
645 }
646 
648 
649 bool Executor::saveState(const std::string& xmlFile)
650 {
651  DEBTRACE("Executor::saveState() in " << xmlFile);
652  bool result = false;
653  try {
656  vst.openFileDump(xmlFile.c_str());
657  _root->accept(&vst);
658  vst.closeFileDump();
659  result = true;
660  }
661  catch(Exception& ex) {
662  std::cerr << ex.what() << std::endl;
663  }
664  return result;
665 }
666 
668 
670 {
671  DEBTRACE("Executor::loadState()");
673  return true;
674 }
675 
676 
677 static int isfile(const char *filename)
678 {
679  struct stat buf;
680  if (stat(filename, &buf) != 0)
681  return 0;
682  if (!S_ISREG(buf.st_mode))
683  return 0;
684  return 1;
685 }
686 
688 
690 {
692  _displayDot(graph);
693 }
694 
696 
701 {
702  std::ofstream g("titi");
703  ((ComposedNode*)graph)->writeDot(g);
704  g.close();
705  const char displayScript[]="display.sh";
706  if(isfile(displayScript))
707  system("sh display.sh");
708  else
709  system("dot -Tpng titi|display -delay 5");
710 }
711 
713 
724 {
725  DEBTRACE("Executor::checkBreakPoints()");
726  vector<Task *>::iterator iter;
727  bool endRequested = false;
728 
729  switch (_execMode)
730  {
731  case YACS::CONTINUE:
732  {
733  break;
734  }
736  {
737  bool stop = false;
738  { // --- Critical section
740  _tasksSave = _tasks;
741  for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
742  {
743  string nodeToLoad = _mainSched->getTaskName(*iter);
744  if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
745  != _listOfBreakPoints.end())
746  {
747  stop = true;
748  break;
749  }
750  }
751  if (stop)
752  {
753  _listOfTasksToLoad.clear();
754  for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
755  {
756  string nodeToLoad = _mainSched->getTaskName(*iter);
757  _listOfTasksToLoad.push_back(nodeToLoad);
758  }
759  if (getNbOfThreads())
760  _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
761  else
763  sendEvent("executor");
764  _condForPilot.notify_all();
765  }
766  if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
767  if (_isOKToEnd) endRequested = true;
768  } // --- End of critical section
769  if (stop) DEBTRACE("wake up from waitResume");
770  break;
771  }
772  default:
773  case YACS::STEPBYSTEP:
774  {
775  { // --- Critical section
777  _tasksSave = _tasks;
778  _listOfTasksToLoad.clear();
779  for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
780  {
781  string nodeToLoad = _mainSched->getTaskName(*iter);
782  _listOfTasksToLoad.push_back(nodeToLoad);
783  }
784  if (getNbOfThreads())
785  _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
786  else
788  sendEvent("executor");
789  _condForPilot.notify_all();
790  if (!_isOKToEnd)
791  waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
792  // or, if no pilot, wait until no more running tasks (stop on error)
793  if (_isOKToEnd) endRequested = true;
794  } // --- End of critical section
795  DEBTRACE("wake up from waitResume");
796  break;
797  }
798  }
799  DEBTRACE("endRequested: " << endRequested);
800  return endRequested;
801 }
802 
803 
805 
812 {
813  DEBTRACE("Executor::waitResume()");
814  _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
815  DEBTRACE("---");
816 }
817 
818 
820 
824 void Executor::loadTask(Task *task, const Executor *execInst)
825 {
826  DEBTRACE("Executor::loadTask(Task *task)");
827  if(task->getState() != YACS::TOLOAD)
828  return;
829  traceExec(task, "state:TOLOAD", ComputePlacement(task));
830  {//Critical section
832  _mainSched->notifyFrom(task,YACS::START,execInst);
833  }//End of critical section
834  try
835  {
836  traceExec(task, "load", ComputePlacement(task));
837  task->load();
838  traceExec(task, "initService", ComputePlacement(task));
839  task->initService();
840  }
841  catch(Exception& ex)
842  {
843  std::cerr << ex.what() << std::endl;
844  {//Critical section
846  task->aborted();
847  _mainSched->notifyFrom(task,YACS::ABORT,execInst);
848  traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
849  }//End of critical section
850  }
851  catch(...)
852  {
853  std::cerr << "Load failed" << std::endl;
854  {//Critical section
856  task->aborted();
857  _mainSched->notifyFrom(task,YACS::ABORT,execInst);
858  traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
859  }//End of critical section
860  }
861 }
862 
864 {
868 };
869 
870 void Executor::loadTasks(const std::vector<Task *>& tasks, const Executor *execInst)
871 {
872  for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++)
873  loadTask(*iter,execInst);
874 }
875 
876 void Executor::loadParallelTasks(const std::vector<Task *>& tasks, const Executor *execInst)
877 {
878  std::vector<Thread> ths(tasks.size());
879  std::size_t ithread(0);
880  for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
881  {
882  DEBTRACE("Executor::loadParallelTasks(Task *task)");
883  struct threadargs *args(new threadargs);
884  args->task = (*iter);
885  args->sched = _mainSched;
886  args->execInst = this;
887  ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
888  }
889  for(ithread=0;ithread<tasks.size();ithread++)
890  ths[ithread].join();
891 }
892 
894 
898 void Executor::launchTasks(const std::vector<Task *>& tasks)
899 {
900  //First phase, make datastream connections
901  for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
902  {
903  YACS::StatesForNode state=(*iter)->getState();
904  if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
905  try
906  {
907  (*iter)->connectService();
908  traceExec(*iter, "connectService",ComputePlacement(*iter));
909  {//Critical section
911  (*iter)->connected();
912  }//End of critical section
913  }
914  catch(Exception& ex)
915  {
916  std::cerr << ex.what() << std::endl;
917  try
918  {
919  (*iter)->disconnectService();
920  traceExec(*iter, "disconnectService",ComputePlacement(*iter));
921  }
922  catch(...)
923  {
924  // Disconnect has failed
925  traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
926  }
927  {//Critical section
929  (*iter)->aborted();
930  _mainSched->notifyFrom(*iter,YACS::ABORT,this);
931  }//End of critical section
932  }
933  catch(...)
934  {
935  std::cerr << "Problem in connectService" << std::endl;
936  try
937  {
938  (*iter)->disconnectService();
939  traceExec(*iter, "disconnectService",ComputePlacement(*iter));
940  }
941  catch(...)
942  {
943  // Disconnect has failed
944  traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
945  }
946  {//Critical section
948  (*iter)->aborted();
949  _mainSched->notifyFrom(*iter,YACS::ABORT,this);
950  }//End of critical section
951  }
952  if((*iter)->getState() == YACS::ERROR)
953  {
954  //try to put all coupled tasks in error
955  std::set<Task*> coupledSet;
956  (*iter)->getCoupledTasks(coupledSet);
957  for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
958  {
959  Task* t=*it;
960  if(t == *iter)continue;
961  if(t->getState() == YACS::ERROR)continue;
962  try
963  {
964  t->disconnectService();
965  traceExec(t, "disconnectService",ComputePlacement(*iter));
966  }
967  catch(...)
968  {
969  // Disconnect has failed
970  traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
971  }
972  {//Critical section
974  t->aborted();
976  }//End of critical section
977  traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
978  }
979  }
980  traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
981  }
982 
983  //Second phase, execute each task in a thread
984  for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
985  {
986  launchTask(*iter);
987  }
988 }
989 
991 
1000 {
1001  DEBTRACE("Executor::launchTask(Task *task)");
1002  struct threadargs *args;
1003  if(task->getState() != YACS::TOACTIVATE)return;
1004 
1005  DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
1006  if(_semThreadCnt == 0)
1007  {
1008  // --- Critical section
1010  //check if we have enough threads to run
1011  std::set<Task*> tmpSet=_runningTasks;
1012  std::set<Task*>::iterator it = tmpSet.begin();
1013  std::string status="running";
1014  std::set<Task*> coupledSet;
1015  while( it != tmpSet.end() )
1016  {
1017  Task* tt=*it;
1018  coupledSet.clear();
1019  tt->getCoupledTasks(coupledSet);
1020  status="running";
1021  for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1022  {
1023  if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1024  tmpSet.erase(*iter);
1025  }
1026  if(status=="running")break;
1027  it = tmpSet.begin();
1028  }
1029 
1030  if(status=="toactivate")
1031  {
1032  std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1033  std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1034  }
1035  // --- End of critical section
1036  }
1037 
1038  _semForMaxThreads.wait();
1039  _semThreadCnt -= 1;
1040 
1041  args= new threadargs;
1042  args->task = task;
1043  args->sched = _mainSched;
1044  args->execInst = this;
1045 
1046  traceExec(task, "launch",ComputePlacement(task));
1047 
1048  { // --- Critical section
1051  _runningTasks.insert(task);
1052  task->begin(); //change state to ACTIVATED
1053  } // --- End of critical section
1055 }
1056 
1058 
1060 {
1061  DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1062 // _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1065  {
1067  _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1068  }
1070  DEBTRACE("---");
1071 }
1072 
1074 
1075 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
1076 {
1077  /*_mutexForNbOfConcurrentThreads.lock();
1078  _groupOfAllThreadsCreated.remove(thread);
1079  delete thread;
1080  _mutexForNbOfConcurrentThreads.unlock();*/
1081 }
1082 
1083 
1085 
1087 {
1088  DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1090  {
1092  _condForNewTasksToPerform.notify_all();
1093  }
1094  else
1096 }
1097 
1099 
1101 {
1102  int ret;
1105  ret = _groupOfAllThreadsCreated.size();
1106  return ret;
1107 }
1108 
1113 {
1114  DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1115  struct threadargs *args = (struct threadargs *) arg;
1116  Task *task=args->task;
1117  Scheduler *sched=args->sched;
1118  Executor *execInst=args->execInst;
1119  delete args;
1120  execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1121  return 0;
1122 }
1123 
1125 
1138 {
1139  DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1140 
1141  struct threadargs *args = (struct threadargs *) arg;
1142  Task *task=args->task;
1143  Scheduler *sched=args->sched;
1144  Executor *execInst=args->execInst;
1145  delete args;
1146  execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1147 
1148  Thread::detach();
1149 
1150  // Execute task
1151 
1152  if(execInst->getDPLScopeSensitive())
1153  {
1154  Node *node(dynamic_cast<Node *>(task));
1155  ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1156  if(node!=0 && gfn!=0)
1157  node->applyDPLScope(gfn);
1158  }
1159 
1161  try
1162  {
1163  execInst->traceExec(task, "start execution",ComputePlacement(task));
1164  task->execute();
1165  execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1166  }
1167  catch(Exception& ex)
1168  {
1169  std::cerr << "YACS Exception during execute" << std::endl;
1170  std::cerr << ex.what() << std::endl;
1171  ev=YACS::ABORT;
1172  string message = "end execution ABORT, ";
1173  message += ex.what();
1174  execInst->traceExec(task, message,ComputePlacement(task));
1175  }
1176  catch(...)
1177  {
1178  // Execution has failed
1179  std::cerr << "Execution has failed: unknown reason" << std::endl;
1180  ev=YACS::ABORT;
1181  execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1182  }
1183 
1184  // Disconnect task
1185  try
1186  {
1187  DEBTRACE("task->disconnectService()");
1188  task->disconnectService();
1189  execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1190  }
1191  catch(...)
1192  {
1193  // Disconnect has failed
1194  std::cerr << "disconnect has failed" << std::endl;
1195  ev=YACS::ABORT;
1196  execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1197  }
1198  //
1199 
1200  std::string placement(ComputePlacement(task));
1201 
1202  // container management for HomogeneousPoolOfContainer
1203 
1204  HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1205  if(contC)
1206  {
1207  YACS::BASES::AutoLocker<Container> alckCont(contC);
1208  contC->release(task);
1209  }
1210 
1211  DEBTRACE("End task->execute()");
1212  { // --- Critical section
1214  try
1215  {
1216  if (ev == YACS::FINISH) task->finished();
1217  if (ev == YACS::ABORT)
1218  {
1219  execInst->_errorDetected = true;
1220  if (execInst->_stopOnErrorRequested)
1221  {
1222  execInst->_execMode = YACS::STEPBYSTEP;
1223  execInst->_isOKToEnd = true;
1224  }
1225  task->aborted();
1226  }
1227  execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1228  sched->notifyFrom(task,ev,execInst);
1229  }
1230  catch(Exception& ex)
1231  {
1232  //notify has failed : it is supposed to have set state
1233  //so no need to do anything
1234  std::cerr << "Error during notification" << std::endl;
1235  std::cerr << ex.what() << std::endl;
1236  }
1237  catch(...)
1238  {
1239  //notify has failed : it is supposed to have set state
1240  //so no need to do anything
1241  std::cerr << "Notification failed" << std::endl;
1242  }
1243  execInst->_numberOfRunningTasks--;
1244  execInst->_runningTasks.erase(task);
1245  DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks
1246  << " _execMode: " << execInst->_execMode
1247  << " _executorState: " << execInst->_executorState);
1248  if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1249  {
1250  if (execInst->_executorState == YACS::WAITINGTASKS)
1251  {
1252  execInst->_executorState = YACS::PAUSED;
1253  execInst->sendEvent("executor");
1254  execInst->_condForPilot.notify_all();
1255  if (execInst->_errorDetected &&
1256  execInst->_stopOnErrorRequested &&
1257  !execInst->_isRunningunderExternalControl)
1258  execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1259  }
1260  }
1261  DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1262  execInst->_semForMaxThreads.post();
1263  execInst->_semThreadCnt += 1;
1264  DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1265  if (execInst->_executorState != YACS::PAUSED) execInst->wakeUp();
1266 
1267  } // --- End of critical section (change state)
1268 
1269  //execInst->notifyEndOfThread(0);
1270  Thread::exit(0);
1271  return 0;
1272 }
1273 
1274 void Executor::traceExec(Task *task, const std::string& message, const std::string& placement)
1275 {
1276  string nodeName = _mainSched->getTaskName(task);
1277  Container *cont = task->getContainer();
1278  string containerName = "---";
1279  if (cont)
1280  containerName = cont->getName();
1281 
1282 #ifdef WIN32
1283  DWORD now = timeGetTime();
1284  double elapse = (now - _start)/1000.0;
1285 #else
1286  timeval now;
1287  gettimeofday(&now, NULL);
1288  double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
1289 #endif
1290  {
1292  _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1293  _trace << flush;
1294  }
1295 }
1296 
1298 
1301 void Executor::sendEvent(const std::string& event)
1302 {
1304  YASSERT(disp);
1305  YASSERT(_root);
1306  disp->dispatch(_root,event);
1307 }
1308 
1315 void Executor::FilterTasksConsideringContainers(std::vector<Task *>& tsks)
1316 {
1317  std::map<HomogeneousPoolContainer *, std::vector<Task *> > m;
1318  for(std::vector<Task *>::const_iterator it=tsks.begin();it!=tsks.end();it++)
1319  {
1320  Task *cur(*it);
1321  if(!cur)
1322  continue;
1323  Container *cont(cur->getContainer());
1324  if(!cont)
1325  {
1326  m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1327  continue;
1328  }
1329  HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1330  if(!contC)
1331  {
1332  m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1333  continue;
1334  }
1335  m[contC].push_back(cur);
1336  }
1337  //
1338  std::vector<Task *> ret;
1339  for(std::map<HomogeneousPoolContainer *, std::vector<Task *> >::const_iterator it=m.begin();it!=m.end();it++)
1340  {
1341  HomogeneousPoolContainer *curhpc((*it).first);
1342  const std::vector<Task *>& curtsks((*it).second);
1343  if(!curhpc)
1344  {
1345  ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1346  }
1347  else
1348  {
1349  // start of critical section for container curhpc
1350  YACS::BASES::AutoLocker<Container> alckForCont(curhpc);
1351  std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1352  std::size_t sz(curhpc->getNumberOfFreePlace());
1353  std::vector<Task *>::const_iterator it2(curtsks.begin());
1354  for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1355  {
1356  vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1357  ret.push_back(*it2);
1358  }
1359  curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1360  //end of critical section
1361  }
1362  }
1363  //
1364  tsks=ret;
1365 }
1366 
1367 std::string Executor::ComputePlacement(Task *zeTask)
1368 {
1369  std::string placement("---");
1370  if(!zeTask)
1371  return placement;
1372  if(zeTask->getContainer())
1373  placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1374  return placement;
1375 }