Version: 8.3.0
YACS::ENGINE::Executor Class Reference

Threaded Executor. More...

#include <Executor.hxx>

Collaboration diagram for YACS::ENGINE::Executor:

Public Member Functions

 Executor ()
 
 ~Executor ()
 
void RunA (Scheduler *graph, int debug=0, bool fromScratch=true)
 Execute a graph waiting for completion. More...
 
void RunW (Scheduler *graph, int debug=0, bool fromScratch=true)
 
void RunB (Scheduler *graph, int debug=0, bool fromScratch=true)
 Execute a graph with breakpoints or step by step. More...
 
void setKeepGoingProperty (bool newVal)
 
bool getKeepGoingProperty () const
 
void setDPLScopeSensitive (bool newVal)
 
bool getDPLScopeSensitive () const
 
YACS::ExecutionMode getCurrentExecMode ()
 
YACS::ExecutorState getExecutorState ()
 
void setExecMode (YACS::ExecutionMode mode)
 Dynamically set the current mode of execution. More...
 
void setListOfBreakPoints (std::list< std::string > listOfBreakPoints)
 define a list of nodes names as breakpoints in the graph More...
 
std::list< std::string > getTasksToLoad ()
 Get the list of tasks to load, to define a subset to execute in step by step mode. More...
 
bool setStepsToExecute (std::list< std::string > listToExecute)
 Define a subset of task to execute in step by step mode. More...
 
bool resumeCurrentBreakPoint ()
 wake up executor when in pause More...
 
bool isNotFinished ()
 
void stopExecution ()
 stops the execution as soon as possible More...
 
bool saveState (const std::string &xmlFile)
 save the current state of execution in an xml file More...
 
bool loadState ()
 not yet implemented More...
 
int getNbOfThreads ()
 number of running tasks More...
 
int getNumberOfRunningTasks () const
 
void displayDot (Scheduler *graph)
 Display the graph state as a dot display, public method. More...
 
void setStopOnError (bool dumpRequested=false, std::string xmlFile="")
 ask to stop execution on the first node found in error More...
 
void unsetStopOnError ()
 ask to do not stop execution on nodes found in error More...
 
void waitPause ()
 suspend pilot execution until Executor is in pause or waiting tasks completion mode. More...
 
YACS::BASES::Mutex & getTheMutexForSchedulerUpdate ()
 

Static Public Attributes

static int _maxThreads
 
static size_t _threadStackSize
 

Protected Member Functions

bool checkBreakPoints ()
 Wait reactivation in modes Step By step or with BreakPoints. More...
 
void waitResume ()
 in modes Step By step or with BreakPoint, wait until pilot resumes the execution More...
 
void loadTask (Task *task, const Executor *execInst)
 Perform loading of a Task. More...
 
void loadTasks (const std::vector< Task * > &tasks, const Executor *execInst)
 
void loadParallelTasks (const std::vector< Task * > &tasks, const Executor *execInst)
 
void launchTasks (const std::vector< Task * > &tasks)
 Execute a list of tasks possibly connected through datastream links. More...
 
void launchTask (Task *task)
 Execute a Task in a thread. More...
 
void wakeUp ()
 must be used protected by _mutexForSchedulerUpdate! More...
 
void sleepWhileNoEventsFromAnyRunningTask ()
 wait until a running task ends More...
 
void notifyEndOfThread (YACS::BASES::Thread *thread)
 not implemented More...
 
void traceExec (Task *task, const std::string &message, const std::string &placement)
 
void _displayDot (Scheduler *graph)
 Display the graph state as a dot display. More...
 
virtual void sendEvent (const std::string &event)
 emit notification to all observers registered with the dispatcher More...
 

Static Protected Member Functions

static void FilterTasksConsideringContainers (std::vector< Task * > &tsks)
 
static std::string ComputePlacement (Task *zeTask)
 
static void * functionForTaskLoad (void *)
 
static void * functionForTaskExecution (void *)
 Function to perform execution of a task in a thread. More...
 

Protected Attributes

Scheduler_mainSched
 
ComposedNode_root
 
int _nbOfConcurrentThreads
 
YACS::BASES::Mutex _mutexForNbOfConcurrentThreads
 
YACS::BASES::Condition _condForNewTasksToPerform
 
YACS::BASES::Semaphore _semForMaxThreads
 
YACS::BASES::Condition _condForStepByStep
 
YACS::BASES::Condition _condForPilot
 
YACS::BASES::Mutex _mutexForSchedulerUpdate
 
YACS::BASES::Mutex _mutexForTrace
 
bool _toContinue
 
bool _isOKToEnd
 
bool _stopOnErrorRequested
 
bool _dumpOnErrorRequested
 
bool _errorDetected
 
bool _isRunningunderExternalControl
 
bool _isWaitingEventsFromRunningTasks
 
int _numberOfRunningTasks
 
std::set< Task * > _runningTasks
 
int _numberOfEndedTasks
 
int _semThreadCnt
 
YACS::ExecutorState _executorState
 
YACS::ExecutionMode _execMode
 
std::list< std::string > _listOfBreakPoints
 
std::list< std::string > _listOfTasksToLoad
 
std::vector< Task * > _tasks
 
std::vector< Task * > _tasksSave
 
std::list< YACS::BASES::Thread * > _groupOfAllThreadsCreated
 
std::ofstream _trace
 
std::string _dumpErrorFile
 
bool _keepGoingOnFail
 
bool _DPLScopeSensitive
 specifies if scope DynParaLoop is active or not. False by default. More...
 
timeval _start
 

Detailed Description

Threaded Executor.

Definition at line 55 of file Executor.hxx.

Constructor & Destructor Documentation

Executor::~Executor ( )

Definition at line 93 of file Executor.cxx.

References _groupOfAllThreadsCreated.

94 {
95  for(list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
96  delete *iter;
97 }

Member Function Documentation

void Executor::_displayDot ( Scheduler graph)
protected

Display the graph state as a dot display.

Parameters
graph: the node to display

Definition at line 700 of file Executor.cxx.

References isfile().

Referenced by displayDot(), RunA(), and RunB().

701 {
702  std::ofstream g("titi");
703  ((ComposedNode*)graph)->writeDot(g);
704  g.close();
705  const char displayScript[]="display.sh";
706  if(isfile(displayScript))
707  system("sh display.sh");
708  else
709  system("dot -Tpng titi|display -delay 5");
710 }
bool Executor::checkBreakPoints ( )
protected

Wait reactivation in modes Step By step or with BreakPoints.

Check mode of execution (set by main thread):

Definition at line 723 of file Executor.cxx.

References _condForPilot, _execMode, _executorState, _isOKToEnd, _listOfBreakPoints, _listOfTasksToLoad, _mainSched, _mutexForSchedulerUpdate, _tasks, _tasksSave, YACS::CONTINUE, DEBTRACE, getNbOfThreads(), YACS::ENGINE::Scheduler::getTaskName(), YACS::PAUSED, sendEvent(), YACS::STEPBYSTEP, YACS::STOPBEFORENODES, YACS::WAITINGTASKS, and waitResume().

Referenced by RunB().

724 {
725  DEBTRACE("Executor::checkBreakPoints()");
726  vector<Task *>::iterator iter;
727  bool endRequested = false;
728 
729  switch (_execMode)
730  {
731  case YACS::CONTINUE:
732  {
733  break;
734  }
736  {
737  bool stop = false;
738  { // --- Critical section
740  _tasksSave = _tasks;
741  for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
742  {
743  string nodeToLoad = _mainSched->getTaskName(*iter);
744  if (find(_listOfBreakPoints.begin(), _listOfBreakPoints.end(), nodeToLoad)
745  != _listOfBreakPoints.end())
746  {
747  stop = true;
748  break;
749  }
750  }
751  if (stop)
752  {
753  _listOfTasksToLoad.clear();
754  for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
755  {
756  string nodeToLoad = _mainSched->getTaskName(*iter);
757  _listOfTasksToLoad.push_back(nodeToLoad);
758  }
759  if (getNbOfThreads())
760  _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
761  else
763  sendEvent("executor");
764  _condForPilot.notify_all();
765  }
766  if (stop && !_isOKToEnd) waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
767  if (_isOKToEnd) endRequested = true;
768  } // --- End of critical section
769  if (stop) DEBTRACE("wake up from waitResume");
770  break;
771  }
772  default:
773  case YACS::STEPBYSTEP:
774  {
775  { // --- Critical section
777  _tasksSave = _tasks;
778  _listOfTasksToLoad.clear();
779  for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
780  {
781  string nodeToLoad = _mainSched->getTaskName(*iter);
782  _listOfTasksToLoad.push_back(nodeToLoad);
783  }
784  if (getNbOfThreads())
785  _executorState = YACS::WAITINGTASKS; // will be paused after completion of running tasks
786  else
788  sendEvent("executor");
789  _condForPilot.notify_all();
790  if (!_isOKToEnd)
791  waitResume(); // wait until pilot calls resumeCurrentBreakPoint(), mutex released during wait
792  // or, if no pilot, wait until no more running tasks (stop on error)
793  if (_isOKToEnd) endRequested = true;
794  } // --- End of critical section
795  DEBTRACE("wake up from waitResume");
796  break;
797  }
798  }
799  DEBTRACE("endRequested: " << endRequested);
800  return endRequested;
801 }
std::string Executor::ComputePlacement ( Task zeTask)
staticprotected

Definition at line 1367 of file Executor.cxx.

References YACS::ENGINE::Task::getContainer(), and YACS::ENGINE::Container::getFullPlacementId().

Referenced by functionForTaskExecution(), launchTask(), launchTasks(), and loadTask().

1368 {
1369  std::string placement("---");
1370  if(!zeTask)
1371  return placement;
1372  if(zeTask->getContainer())
1373  placement=zeTask->getContainer()->getFullPlacementId(zeTask);
1374  return placement;
1375 }
void Executor::displayDot ( Scheduler graph)

Display the graph state as a dot display, public method.

Definition at line 689 of file Executor.cxx.

References _displayDot(), and _isRunningunderExternalControl.

Referenced by main().

690 {
692  _displayDot(graph);
693 }
void Executor::FilterTasksConsideringContainers ( std::vector< Task * > &  tsks)
staticprotected

This method takes in input a list of tasks and selects from that lists a part of it considering only the containers. If tasks have no container instance subclass of HomogeneousPoolContainer this method will let the tsks untouched.

Parameters
[in,out]tsks- list of tasks to be

Definition at line 1315 of file Executor.cxx.

References YACS::ENGINE::HomogeneousPoolContainer::allocateFor(), YACS::ENGINE::Task::getContainer(), YACS::ENGINE::HomogeneousPoolContainer::getNumberOfFreePlace(), CORBAEngineTest::i, gui.GraphViewer::m, and PMMLBasicsTestLauncher::ret.

Referenced by RunB().

1316 {
1317  std::map<HomogeneousPoolContainer *, std::vector<Task *> > m;
1318  for(std::vector<Task *>::const_iterator it=tsks.begin();it!=tsks.end();it++)
1319  {
1320  Task *cur(*it);
1321  if(!cur)
1322  continue;
1323  Container *cont(cur->getContainer());
1324  if(!cont)
1325  {
1326  m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1327  continue;
1328  }
1329  HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(cont));
1330  if(!contC)
1331  {
1332  m[(HomogeneousPoolContainer *)NULL].push_back(cur);
1333  continue;
1334  }
1335  m[contC].push_back(cur);
1336  }
1337  //
1338  std::vector<Task *> ret;
1339  for(std::map<HomogeneousPoolContainer *, std::vector<Task *> >::const_iterator it=m.begin();it!=m.end();it++)
1340  {
1341  HomogeneousPoolContainer *curhpc((*it).first);
1342  const std::vector<Task *>& curtsks((*it).second);
1343  if(!curhpc)
1344  {
1345  ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1346  }
1347  else
1348  {
1349  // start of critical section for container curhpc
1350  YACS::BASES::AutoLocker<Container> alckForCont(curhpc);
1351  std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1352  std::size_t sz(curhpc->getNumberOfFreePlace());
1353  std::vector<Task *>::const_iterator it2(curtsks.begin());
1354  for(std::size_t i=0;i<sz && it2!=curtsks.end();i++,it2++)
1355  {
1356  vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1357  ret.push_back(*it2);
1358  }
1359  curhpc->allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1360  //end of critical section
1361  }
1362  }
1363  //
1364  tsks=ret;
1365 }
void * Executor::functionForTaskExecution ( void *  arg)
staticprotected

Function to perform execution of a task in a thread.

Parameters
arg: 3 elements (a Task, a Scheduler, an Executor)

Calls Task::execute

Calls Task::finished when the task is finished

Calls (notify with event YACS::FINISH) Scheduler::notifyFrom when the task is finished

Calls Executor::wakeUp and Executor::notifyEndOfThread

Definition at line 1137 of file Executor.cxx.

References _condForPilot, _condForStepByStep, _errorDetected, _execMode, _executorState, _isOKToEnd, _isRunningunderExternalControl, _mutexForSchedulerUpdate, _numberOfRunningTasks, _runningTasks, _semForMaxThreads, _semThreadCnt, _stopOnErrorRequested, YACS::ABORT, YACS::ENGINE::Task::aborted(), YACS::ENGINE::Node::applyDPLScope(), ComputePlacement(), YACS::CONTINUE, DEBTRACE, YACS::ENGINE::Task::disconnectService(), threadargs::execInst, YACS::ENGINE::Task::execute(), YACS::FINISH, YACS::ENGINE::Task::finished(), YACS::ENGINE::Task::getContainer(), getDPLScopeSensitive(), YACS::ENGINE::Task::getState(), YACS::ENGINE::Node::getStateName(), YACS::ENGINE::Scheduler::notifyFrom(), YACS::PAUSED, YACS::ENGINE::HomogeneousPoolContainer::release(), threadargs::sched, sendEvent(), YACS::STEPBYSTEP, threadargs::task, traceExec(), YACS::WAITINGTASKS, wakeUp(), and YACS::Exception::what().

Referenced by launchTask().

1138 {
1139  DEBTRACE("Executor::functionForTaskExecution(void *arg)");
1140 
1141  struct threadargs *args = (struct threadargs *) arg;
1142  Task *task=args->task;
1143  Scheduler *sched=args->sched;
1144  Executor *execInst=args->execInst;
1145  delete args;
1146  execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),ComputePlacement(task));
1147 
1148  Thread::detach();
1149 
1150  // Execute task
1151 
1152  if(execInst->getDPLScopeSensitive())
1153  {
1154  Node *node(dynamic_cast<Node *>(task));
1155  ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1156  if(node!=0 && gfn!=0)
1157  node->applyDPLScope(gfn);
1158  }
1159 
1161  try
1162  {
1163  execInst->traceExec(task, "start execution",ComputePlacement(task));
1164  task->execute();
1165  execInst->traceExec(task, "end execution OK",ComputePlacement(task));
1166  }
1167  catch(Exception& ex)
1168  {
1169  std::cerr << "YACS Exception during execute" << std::endl;
1170  std::cerr << ex.what() << std::endl;
1171  ev=YACS::ABORT;
1172  string message = "end execution ABORT, ";
1173  message += ex.what();
1174  execInst->traceExec(task, message,ComputePlacement(task));
1175  }
1176  catch(...)
1177  {
1178  // Execution has failed
1179  std::cerr << "Execution has failed: unknown reason" << std::endl;
1180  ev=YACS::ABORT;
1181  execInst->traceExec(task, "end execution ABORT, unknown reason",ComputePlacement(task));
1182  }
1183 
1184  // Disconnect task
1185  try
1186  {
1187  DEBTRACE("task->disconnectService()");
1188  task->disconnectService();
1189  execInst->traceExec(task, "disconnectService",ComputePlacement(task));
1190  }
1191  catch(...)
1192  {
1193  // Disconnect has failed
1194  std::cerr << "disconnect has failed" << std::endl;
1195  ev=YACS::ABORT;
1196  execInst->traceExec(task, "disconnectService failed, ABORT",ComputePlacement(task));
1197  }
1198  //
1199 
1200  std::string placement(ComputePlacement(task));
1201 
1202  // container management for HomogeneousPoolOfContainer
1203 
1204  HomogeneousPoolContainer *contC(dynamic_cast<HomogeneousPoolContainer *>(task->getContainer()));
1205  if(contC)
1206  {
1207  YACS::BASES::AutoLocker<Container> alckCont(contC);
1208  contC->release(task);
1209  }
1210 
1211  DEBTRACE("End task->execute()");
1212  { // --- Critical section
1214  try
1215  {
1216  if (ev == YACS::FINISH) task->finished();
1217  if (ev == YACS::ABORT)
1218  {
1219  execInst->_errorDetected = true;
1220  if (execInst->_stopOnErrorRequested)
1221  {
1222  execInst->_execMode = YACS::STEPBYSTEP;
1223  execInst->_isOKToEnd = true;
1224  }
1225  task->aborted();
1226  }
1227  execInst->traceExec(task, "state:"+Node::getStateName(task->getState()),placement);
1228  sched->notifyFrom(task,ev,execInst);
1229  }
1230  catch(Exception& ex)
1231  {
1232  //notify has failed : it is supposed to have set state
1233  //so no need to do anything
1234  std::cerr << "Error during notification" << std::endl;
1235  std::cerr << ex.what() << std::endl;
1236  }
1237  catch(...)
1238  {
1239  //notify has failed : it is supposed to have set state
1240  //so no need to do anything
1241  std::cerr << "Notification failed" << std::endl;
1242  }
1243  execInst->_numberOfRunningTasks--;
1244  execInst->_runningTasks.erase(task);
1245  DEBTRACE("_numberOfRunningTasks: " << execInst->_numberOfRunningTasks
1246  << " _execMode: " << execInst->_execMode
1247  << " _executorState: " << execInst->_executorState);
1248  if ((execInst->_numberOfRunningTasks == 0) && (execInst->_execMode != YACS::CONTINUE)) // no more running tasks
1249  {
1250  if (execInst->_executorState == YACS::WAITINGTASKS)
1251  {
1252  execInst->_executorState = YACS::PAUSED;
1253  execInst->sendEvent("executor");
1254  execInst->_condForPilot.notify_all();
1255  if (execInst->_errorDetected &&
1256  execInst->_stopOnErrorRequested &&
1257  !execInst->_isRunningunderExternalControl)
1258  execInst->_condForStepByStep.notify_all(); // exec thread may be on waitResume
1259  }
1260  }
1261  DEBTRACE("before _semForMaxThreads.post " << execInst->_semThreadCnt);
1262  execInst->_semForMaxThreads.post();
1263  execInst->_semThreadCnt += 1;
1264  DEBTRACE("after _semForMaxThreads.post " << execInst->_semThreadCnt);
1265  if (execInst->_executorState != YACS::PAUSED) execInst->wakeUp();
1266 
1267  } // --- End of critical section (change state)
1268 
1269  //execInst->notifyEndOfThread(0);
1270  Thread::exit(0);
1271  return 0;
1272 }
void * Executor::functionForTaskLoad ( void *  arg)
staticprotected

This thread is NOT supposed to be detached !

Definition at line 1112 of file Executor.cxx.

References DEBTRACE, threadargs::execInst, loadTask(), threadargs::sched, and threadargs::task.

Referenced by loadParallelTasks().

1113 {
1114  DEBTRACE("Executor::functionForTaskLoad(void *arg)");
1115  struct threadargs *args = (struct threadargs *) arg;
1116  Task *task=args->task;
1117  Scheduler *sched=args->sched;
1118  Executor *execInst=args->execInst;
1119  delete args;
1120  execInst->loadTask(task,execInst);// no throw of this method - all throw are catched !
1121  return 0;
1122 }
YACS::ExecutionMode Executor::getCurrentExecMode ( )

Definition at line 374 of file Executor.cxx.

References _execMode, and _isRunningunderExternalControl.

375 {
377  return _execMode;
378 }
bool YACS::ENGINE::Executor::getDPLScopeSensitive ( ) const
inline

Definition at line 105 of file Executor.hxx.

Referenced by functionForTaskExecution().

105 { return _DPLScopeSensitive; }
YACS::ExecutorState Executor::getExecutorState ( )

Definition at line 381 of file Executor.cxx.

References _executorState, and _isRunningunderExternalControl.

Referenced by main().

382 {
384  return _executorState;
385 }
bool YACS::ENGINE::Executor::getKeepGoingProperty ( ) const
inline

Definition at line 103 of file Executor.hxx.

Referenced by YACS::ENGINE::ForEachLoop::updateStateOnFailedEventFrom().

103 { return _keepGoingOnFail; }
int Executor::getNbOfThreads ( )

number of running tasks

Definition at line 1100 of file Executor.cxx.

References _groupOfAllThreadsCreated, _isRunningunderExternalControl, _mutexForNbOfConcurrentThreads, and PMMLBasicsTestLauncher::ret.

Referenced by checkBreakPoints().

1101 {
1102  int ret;
1105  ret = _groupOfAllThreadsCreated.size();
1106  return ret;
1107 }
int YACS::ENGINE::Executor::getNumberOfRunningTasks ( ) const
inline

Definition at line 118 of file Executor.hxx.

118 { return _numberOfRunningTasks; }
std::list< std::string > Executor::getTasksToLoad ( )

Get the list of tasks to load, to define a subset to execute in step by step mode.

If the executor is not in mode YACS::WAITINGTASKS nor YACS::PAUSED, the list is empty. Use Executor::waitPause to wait.

Definition at line 508 of file Executor.cxx.

References _executorState, _isRunningunderExternalControl, _listOfTasksToLoad, _mutexForSchedulerUpdate, DEBTRACE, YACS::FINISHED, YACS::INITIALISED, YACS::NOTYETINITIALIZED, YACS::PAUSED, YACS::RUNNING, YACS::STOPPED, and YACS::WAITINGTASKS.

509 {
510  DEBTRACE("Executor::getTasksToLoad()");
511  list<string> listOfNodesToLoad;
512  listOfNodesToLoad.clear();
513  { // --- Critical section
516  switch (_executorState)
517  {
518  case YACS::WAITINGTASKS:
519  case YACS::PAUSED:
520  {
521  listOfNodesToLoad = _listOfTasksToLoad;
522  break;
523  }
525  case YACS::INITIALISED:
526  case YACS::RUNNING:
527  case YACS::FINISHED:
528  case YACS::STOPPED:
529  default:
530  {
531  break;
532  }
533  }
534  } // --- End of critical section
535  return listOfNodesToLoad;
536 }
YACS::BASES::Mutex& YACS::ENGINE::Executor::getTheMutexForSchedulerUpdate ( )
inline
bool Executor::isNotFinished ( )

Definition at line 388 of file Executor.cxx.

References _isRunningunderExternalControl, and _toContinue.

Referenced by main().

389 {
391  return _toContinue;
392 }
void Executor::launchTask ( Task task)
protected

Execute a Task in a thread.

Parameters
task: Task to execute

Calls Scheduler::notifyFrom of main node (_mainSched) to notify start

Calls Executor::functionForTaskExecution in Thread

Definition at line 999 of file Executor.cxx.

References _mainSched, _maxThreads, _mutexForSchedulerUpdate, _numberOfRunningTasks, _runningTasks, _semForMaxThreads, _semThreadCnt, _threadStackSize, YACS::ENGINE::Task::begin(), ComputePlacement(), DEBTRACE, threadargs::execInst, functionForTaskExecution(), YACS::ENGINE::Task::getCoupledTasks(), YACS::ENGINE::Task::getState(), threadargs::sched, threadargs::task, YACS::TOACTIVATE, and traceExec().

Referenced by launchTasks().

1000 {
1001  DEBTRACE("Executor::launchTask(Task *task)");
1002  struct threadargs *args;
1003  if(task->getState() != YACS::TOACTIVATE)return;
1004 
1005  DEBTRACE("before _semForMaxThreads.wait " << _semThreadCnt);
1006  if(_semThreadCnt == 0)
1007  {
1008  // --- Critical section
1010  //check if we have enough threads to run
1011  std::set<Task*> tmpSet=_runningTasks;
1012  std::set<Task*>::iterator it = tmpSet.begin();
1013  std::string status="running";
1014  std::set<Task*> coupledSet;
1015  while( it != tmpSet.end() )
1016  {
1017  Task* tt=*it;
1018  coupledSet.clear();
1019  tt->getCoupledTasks(coupledSet);
1020  status="running";
1021  for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1022  {
1023  if((*iter)->getState() == YACS::TOACTIVATE)status="toactivate";
1024  tmpSet.erase(*iter);
1025  }
1026  if(status=="running")break;
1027  it = tmpSet.begin();
1028  }
1029 
1030  if(status=="toactivate")
1031  {
1032  std::cerr << "WARNING: maybe you need more threads to run your schema (current value="<< _maxThreads << ")" << std::endl;
1033  std::cerr << "If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1034  }
1035  // --- End of critical section
1036  }
1037 
1038  _semForMaxThreads.wait();
1039  _semThreadCnt -= 1;
1040 
1041  args= new threadargs;
1042  args->task = task;
1043  args->sched = _mainSched;
1044  args->execInst = this;
1045 
1046  traceExec(task, "launch",ComputePlacement(task));
1047 
1048  { // --- Critical section
1051  _runningTasks.insert(task);
1052  task->begin(); //change state to ACTIVATED
1053  } // --- End of critical section
1055 }
void Executor::launchTasks ( const std::vector< Task * > &  tasks)
protected

Execute a list of tasks possibly connected through datastream links.

Parameters
tasks: a list of tasks to execute

Definition at line 898 of file Executor.cxx.

References _mainSched, _mutexForSchedulerUpdate, YACS::ABORT, YACS::ENGINE::Task::aborted(), ComputePlacement(), YACS::ENGINE::Task::disconnectService(), YACS::ERROR, YACS::ENGINE::Task::getState(), YACS::ENGINE::Node::getStateName(), launchTask(), YACS::ENGINE::Scheduler::notifyFrom(), CORBAEngineTest::state, gui.Appli::t, YACS::TOLOAD, YACS::TORECONNECT, traceExec(), and YACS::Exception::what().

Referenced by RunA(), and RunB().

899 {
900  //First phase, make datastream connections
901  for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
902  {
903  YACS::StatesForNode state=(*iter)->getState();
904  if(state != YACS::TOLOAD && state != YACS::TORECONNECT)continue;
905  try
906  {
907  (*iter)->connectService();
908  traceExec(*iter, "connectService",ComputePlacement(*iter));
909  {//Critical section
911  (*iter)->connected();
912  }//End of critical section
913  }
914  catch(Exception& ex)
915  {
916  std::cerr << ex.what() << std::endl;
917  try
918  {
919  (*iter)->disconnectService();
920  traceExec(*iter, "disconnectService",ComputePlacement(*iter));
921  }
922  catch(...)
923  {
924  // Disconnect has failed
925  traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
926  }
927  {//Critical section
929  (*iter)->aborted();
930  _mainSched->notifyFrom(*iter,YACS::ABORT,this);
931  }//End of critical section
932  }
933  catch(...)
934  {
935  std::cerr << "Problem in connectService" << std::endl;
936  try
937  {
938  (*iter)->disconnectService();
939  traceExec(*iter, "disconnectService",ComputePlacement(*iter));
940  }
941  catch(...)
942  {
943  // Disconnect has failed
944  traceExec(*iter, "disconnectService failed, ABORT",ComputePlacement(*iter));
945  }
946  {//Critical section
948  (*iter)->aborted();
949  _mainSched->notifyFrom(*iter,YACS::ABORT,this);
950  }//End of critical section
951  }
952  if((*iter)->getState() == YACS::ERROR)
953  {
954  //try to put all coupled tasks in error
955  std::set<Task*> coupledSet;
956  (*iter)->getCoupledTasks(coupledSet);
957  for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
958  {
959  Task* t=*it;
960  if(t == *iter)continue;
961  if(t->getState() == YACS::ERROR)continue;
962  try
963  {
964  t->disconnectService();
965  traceExec(t, "disconnectService",ComputePlacement(*iter));
966  }
967  catch(...)
968  {
969  // Disconnect has failed
970  traceExec(t, "disconnectService failed, ABORT",ComputePlacement(*iter));
971  }
972  {//Critical section
974  t->aborted();
976  }//End of critical section
977  traceExec(t, "state:"+Node::getStateName(t->getState()),ComputePlacement(*iter));
978  }
979  }
980  traceExec(*iter, "state:"+Node::getStateName((*iter)->getState()),ComputePlacement(*iter));
981  }
982 
983  //Second phase, execute each task in a thread
984  for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
985  {
986  launchTask(*iter);
987  }
988 }
void Executor::loadParallelTasks ( const std::vector< Task * > &  tasks,
const Executor execInst 
)
protected

Definition at line 876 of file Executor.cxx.

References _mainSched, _tasks, _threadStackSize, DEBTRACE, threadargs::execInst, functionForTaskLoad(), threadargs::sched, and threadargs::task.

Referenced by RunB().

877 {
878  std::vector<Thread> ths(tasks.size());
879  std::size_t ithread(0);
880  for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++, ithread++)
881  {
882  DEBTRACE("Executor::loadParallelTasks(Task *task)");
883  struct threadargs *args(new threadargs);
884  args->task = (*iter);
885  args->sched = _mainSched;
886  args->execInst = this;
887  ths[ithread].go(functionForTaskLoad, args, _threadStackSize);
888  }
889  for(ithread=0;ithread<tasks.size();ithread++)
890  ths[ithread].join();
891 }
bool Executor::loadState ( )

not yet implemented

Definition at line 669 of file Executor.cxx.

References _isRunningunderExternalControl, and DEBTRACE.

670 {
671  DEBTRACE("Executor::loadState()");
673  return true;
674 }
void Executor::loadTask ( Task task,
const Executor execInst 
)
protected

Perform loading of a Task.

Parameters
task: Task to load

Definition at line 824 of file Executor.cxx.

References _mainSched, _mutexForSchedulerUpdate, YACS::ABORT, YACS::ENGINE::Task::aborted(), ComputePlacement(), DEBTRACE, YACS::ENGINE::Task::getState(), YACS::ENGINE::Node::getStateName(), YACS::ENGINE::Task::initService(), YACS::ENGINE::Task::load(), YACS::ENGINE::Scheduler::notifyFrom(), YACS::START, YACS::TOLOAD, traceExec(), and YACS::Exception::what().

Referenced by functionForTaskLoad(), loadTasks(), and RunA().

825 {
826  DEBTRACE("Executor::loadTask(Task *task)");
827  if(task->getState() != YACS::TOLOAD)
828  return;
829  traceExec(task, "state:TOLOAD", ComputePlacement(task));
830  {//Critical section
832  _mainSched->notifyFrom(task,YACS::START,execInst);
833  }//End of critical section
834  try
835  {
836  traceExec(task, "load", ComputePlacement(task));
837  task->load();
838  traceExec(task, "initService", ComputePlacement(task));
839  task->initService();
840  }
841  catch(Exception& ex)
842  {
843  std::cerr << ex.what() << std::endl;
844  {//Critical section
846  task->aborted();
847  _mainSched->notifyFrom(task,YACS::ABORT,execInst);
848  traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
849  }//End of critical section
850  }
851  catch(...)
852  {
853  std::cerr << "Load failed" << std::endl;
854  {//Critical section
856  task->aborted();
857  _mainSched->notifyFrom(task,YACS::ABORT,execInst);
858  traceExec(task, "state:"+Node::getStateName(task->getState()), ComputePlacement(task));
859  }//End of critical section
860  }
861 }
void Executor::loadTasks ( const std::vector< Task * > &  tasks,
const Executor execInst 
)
protected

Definition at line 870 of file Executor.cxx.

References _tasks, and loadTask().

871 {
872  for(std::vector<Task *>::const_iterator iter = _tasks.begin(); iter != _tasks.end(); iter++)
873  loadTask(*iter,execInst);
874 }
void Executor::notifyEndOfThread ( YACS::BASES::Thread *  thread)
protected

not implemented

Definition at line 1075 of file Executor.cxx.

1076 {
1077  /*_mutexForNbOfConcurrentThreads.lock();
1078  _groupOfAllThreadsCreated.remove(thread);
1079  delete thread;
1080  _mutexForNbOfConcurrentThreads.unlock();*/
1081 }
bool Executor::resumeCurrentBreakPoint ( )

wake up executor when in pause

When Executor is in state paused or waiting for task completion, the thread running loop RunB waits on condition _condForStepByStep. Thread RunB is waken up.

Returns
true when actually wakes up executor

Definition at line 449 of file Executor.cxx.

References _condForStepByStep, _executorState, _isRunningunderExternalControl, _mutexForSchedulerUpdate, DEBTRACE, YACS::FINISHED, YACS::PAUSED, PMMLBasicsTestLauncher::ret, YACS::RUNNING, sendEvent(), YACS::STOPPED, and YACS::WAITINGTASKS.

Referenced by main(), and stopExecution().

450 {
451  DEBTRACE("Executor::resumeCurrentBreakPoint()");
452  bool ret = false;
453  //bool doDump = false;
454  { // --- Critical section
457  DEBTRACE("_executorState: " << _executorState);
458  switch (_executorState)
459  {
460  case YACS::WAITINGTASKS:
461  case YACS::PAUSED:
462  {
463  _condForStepByStep.notify_all();
465  sendEvent("executor");
466  ret = true;
467  //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
468  break;
469  }
470  case YACS::FINISHED:
471  case YACS::STOPPED:
472  {
473  //if (_dumpOnErrorRequested && _errorDetected) doDump =true;
474  DEBTRACE("Graph Execution finished or stopped !");
475  break;
476  }
477  default :
478  {
479  // debug: no easy way to verify if main loop is acutally waiting on condition
480  }
481  }
482  DEBTRACE("---");
483  //if (doDump) saveState(_dumpErrorFile);
484  } // --- End of critical section
485  return ret;
486 }
void Executor::RunA ( Scheduler graph,
int  debug = 0,
bool  fromScratch = true 
)

Execute a graph waiting for completion.

Parameters
graph: schema to execute
debug: display the graph with dot if debug == 1
fromScratch: if true the graph is reinitialized

Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute

Calls Executor::launchTask to execute a selected Task.

Completion when graph is finished (Scheduler::isFinished)

Definition at line 112 of file Executor.cxx.

References _displayDot(), _execMode, _isWaitingEventsFromRunningTasks, _mainSched, _mutexForSchedulerUpdate, _numberOfEndedTasks, _numberOfRunningTasks, _root, _runningTasks, _toContinue, YACS::CONTINUE, DEBTRACE, YACS::ENGINE::Scheduler::exUpdateState(), YACS::ENGINE::Scheduler::getNextTasks(), CORBAEngineTest::i, YACS::ENGINE::Scheduler::init(), YACS::ENGINE::Scheduler::isFinished(), launchTasks(), loadTask(), YACS::ENGINE::Scheduler::selectRunnableTasks(), and sleepWhileNoEventsFromAnyRunningTask().

113 {
114  DEBTRACE("Executor::RunW debug: " << debug << " fromScratch: " << fromScratch);
115  _mainSched=graph;
116  _root = dynamic_cast<ComposedNode *>(_mainSched);
117  if (!_root) throw Exception("Executor::Run, Internal Error!");
118  bool isMore;
119  int i=0;
120  if(debug>1)_displayDot(graph);
121  if (fromScratch)
122  {
123  graph->init();
124  graph->exUpdateState();
125  }
126  if(debug>1)_displayDot(graph);
127  vector<Task *> tasks;
128  vector<Task *>::iterator iter;
129  _toContinue=true;
133  _runningTasks.clear();
135  while(_toContinue)
136  {
138 
139  if(debug>2)_displayDot(graph);
140 
141  {//Critical section
143  tasks=graph->getNextTasks(isMore);
144  graph->selectRunnableTasks(tasks);
145  }//End of critical section
146 
147  if(debug>2)_displayDot(graph);
148 
149  for(iter=tasks.begin();iter!=tasks.end();iter++)
150  loadTask(*iter,this);
151 
152  if(debug>1)_displayDot(graph);
153 
154  launchTasks(tasks);
155 
156  if(debug>1)_displayDot(graph);
157 
158  {//Critical section
160  _toContinue=!graph->isFinished();
161  }//End of critical section
162  DEBTRACE("_toContinue: " << _toContinue);
163 
164  if(debug>0)_displayDot(graph);
165 
166  i++;
167  }
168 }
void Executor::RunB ( Scheduler graph,
int  debug = 0,
bool  fromScratch = true 
)

Execute a graph with breakpoints or step by step.

To be launch in a thread (main thread controls the progression).

Parameters
graph: schema to execute
debug: display the graph with dot if debug >0
fromScratch: if false, state from a previous partial exection is already loaded

Calls Scheduler::getNextTasks and Scheduler::selectRunnableTasks to select tasks to execute

Calls Executor::checkBreakPoints to verify if a pause is requested

Calls Executor::launchTask to execute a selected Task

Completion when graph is finished (Scheduler::isFinished)

States of execution:

Modes of Execution:

A breakpoint is defined by a node name. The breakpoint is reached when the node becomes ready. Step by Step means execution node by node or group of node by group of nodes. At a given step, the user decides to launch all the ready nodes or only a subset (Caution: some nodes must run in parallel). The next event (end of task) may give a new set of ready nodes, and define a new step.

The graph execution may be controled by a pilot which sends requests. Requests are asynchronous. Requests are taken into account only on certain states, otherwise return the status IgnoredRequest.

If the pilot wants to wait the state YACS::PAUSED or YACS::WAITINGTASKS, synchronisation is obtained with:

TO BE VALIDATED:

  • Pilot may connect to executor during execution, or deconnect.
  • Several Pilots may be connected at the same time (for observation...)

Definition at line 231 of file Executor.cxx.

References _condForPilot, _displayDot(), _dumpErrorFile, _dumpOnErrorRequested, _errorDetected, _executorState, _isOKToEnd, _isWaitingEventsFromRunningTasks, _mainSched, _mutexForSchedulerUpdate, _mutexForTrace, _numberOfEndedTasks, _numberOfRunningTasks, _root, _runningTasks, _start, _tasks, _toContinue, _trace, checkBreakPoints(), DEBTRACE, YACS::ENGINE::Scheduler::exUpdateState(), FilterTasksConsideringContainers(), YACS::FINISHED, YACS::ENGINE::Scheduler::getName(), YACS::ENGINE::Scheduler::getNextTasks(), YACS::ENGINE::Scheduler::init(), YACS::INITIALISED, YACS::ENGINE::Scheduler::isFinished(), launchTasks(), loadParallelTasks(), YACS::NOTYETINITIALIZED, YACS::RUNNING, saveState(), YACS::ENGINE::Scheduler::selectRunnableTasks(), sendEvent(), sleepWhileNoEventsFromAnyRunningTask(), YACS::STOPPED, and YACS::Exception::what().

Referenced by executorFunc().

232 {
233  DEBTRACE("Executor::RunB debug: "<< graph->getName() <<" "<< debug<<" fromScratch: "<<fromScratch);
234 
235  { // --- Critical section
237  _mainSched = graph;
238  _root = dynamic_cast<ComposedNode *>(_mainSched);
239  if (!_root) throw Exception("Executor::Run, Internal Error!");
241  sendEvent("executor");
242  _toContinue=true;
243  _isOKToEnd = false;
244  _errorDetected = false;
247  _runningTasks.clear();
249  string tracefile = "traceExec_";
250  tracefile += _mainSched->getName();
251  _trace.open(tracefile.c_str());
252 #ifdef WIN32
253  _start = timeGetTime();
254 #else
255  gettimeofday(&_start, NULL);
256 #endif
257 
258  } // --- End of critical section
259 
260  if (debug > 1) _displayDot(graph);
261 
262  if (fromScratch)
263  {
264  try
265  {
266  graph->init();
267  graph->exUpdateState();
268  }
269  catch(Exception& ex)
270  {
271  DEBTRACE("exception: "<< (ex.what()));
273  sendEvent("executor");
274  throw;
275  }
276  }
278  sendEvent("executor");
279 
280  if (debug > 1) _displayDot(graph);
281 
282  vector<Task *>::iterator iter;
283  bool isMore;
284  int problemCount=0;
285  int numberAllTasks;
286 
288  sendEvent("executor");
289  while (_toContinue)
290  {
291  DEBTRACE("--- executor main loop");
293  DEBTRACE("--- events...");
294  if (debug > 2) _displayDot(graph);
295  { // --- Critical section
297  _tasks=graph->getNextTasks(isMore);
298  graph->selectRunnableTasks(_tasks);
300  numberAllTasks=_numberOfRunningTasks+_tasks.size();
301  } // --- End of critical section
302  if (debug > 2) _displayDot(graph);
304  {
305  if (checkBreakPoints()) break; // end of thread requested, OK to exit at once;
306  if (debug > 0) _displayDot(graph);
307  DEBTRACE("---");
308  //loadTasks(_tasks);
310  if (debug > 1) _displayDot(graph);
311  DEBTRACE("---");
313  DEBTRACE("---");
314  }
315  if (debug > 1) _displayDot(graph);
316  { // --- Critical section
317  DEBTRACE("---");
319  //It is possible that the graph is finished but it remains running tasks (it's an error but we must take it into account)
320  if(_numberOfRunningTasks == 0)
321  _toContinue = !graph->isFinished();
322 
323  DEBTRACE("_numberOfRunningTasks: " << _numberOfRunningTasks);
324  DEBTRACE("_numberOfEndedTasks: " << _numberOfEndedTasks);
325  DEBTRACE("_toContinue: " << _toContinue);
326  if(_toContinue && numberAllTasks==0)
327  {
328  //Problem : no running tasks and no task to launch ??
329  problemCount++;
330  std::cerr << "Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
331  //Pause to give a chance to interrupt
332  usleep(1000);
333  if(problemCount > 25)
334  {
335  // Too much problems encountered : stop execution
336  _toContinue=false;
337  }
338  }
339 
340  if (! _toContinue)
341  {
343  sendEvent("executor");
344  _condForPilot.notify_all();
345  }
346  } // --- End of critical section
347  if (debug > 0) _displayDot(graph);
348  DEBTRACE("_toContinue: " << _toContinue);
349  }
350 
351  DEBTRACE("End of main Loop");
352 
353  { // --- Critical section
355  if ( _toContinue) // --- break while(): request to stop detected on checkBreakPoints()
356  {
357  DEBTRACE("stop requested: End soon");
359  _toContinue = false;
360  sendEvent("executor");
361  }
362  } // --- End of critical section
364  {
366  }
367  {
369  _trace.close();
370  }
371  DEBTRACE("End of RunB thread");
372 }
void YACS::ENGINE::Executor::RunW ( Scheduler graph,
int  debug = 0,
bool  fromScratch = true 
)
inline

Definition at line 100 of file Executor.hxx.

Referenced by driverTest(), and main().

100 { RunB(graph, debug, fromScratch); }
bool Executor::saveState ( const std::string &  xmlFile)

save the current state of execution in an xml file

Definition at line 649 of file Executor.cxx.

References _mutexForSchedulerUpdate, _root, YACS::ENGINE::ComposedNode::accept(), YACS::ENGINE::VisitorSaveState::closeFileDump(), DEBTRACE, YACS::ENGINE::VisitorSaveState::openFileDump(), and YACS::Exception::what().

Referenced by RunB().

650 {
651  DEBTRACE("Executor::saveState() in " << xmlFile);
652  bool result = false;
653  try {
656  vst.openFileDump(xmlFile.c_str());
657  _root->accept(&vst);
658  vst.closeFileDump();
659  result = true;
660  }
661  catch(Exception& ex) {
662  std::cerr << ex.what() << std::endl;
663  }
664  return result;
665 }
void Executor::sendEvent ( const std::string &  event)
protectedvirtual

emit notification to all observers registered with the dispatcher

The dispatcher is unique and can be obtained by getDispatcher()

Definition at line 1301 of file Executor.cxx.

References _root, YACS::ENGINE::Dispatcher::dispatch(), YACS::ENGINE::Dispatcher::getDispatcher(), and YASSERT.

Referenced by checkBreakPoints(), functionForTaskExecution(), resumeCurrentBreakPoint(), and RunB().

1302 {
1304  YASSERT(disp);
1305  YASSERT(_root);
1306  disp->dispatch(_root,event);
1307 }
void YACS::ENGINE::Executor::setDPLScopeSensitive ( bool  newVal)
inline

Definition at line 104 of file Executor.hxx.

104 { _DPLScopeSensitive=newVal; }
void Executor::setExecMode ( YACS::ExecutionMode  mode)

Dynamically set the current mode of execution.

The mode can be Continue, step by step, or stop before execution of a node defined in a list of breakpoints.

Definition at line 431 of file Executor.cxx.

References _execMode, _isRunningunderExternalControl, _mutexForSchedulerUpdate, and DEBTRACE.

Referenced by main(), and stopExecution().

432 {
433  DEBTRACE("Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
434  { // --- Critical section
437  _execMode = mode;
438  } // --- End of critical section
439 }
void YACS::ENGINE::Executor::setKeepGoingProperty ( bool  newVal)
inline

Definition at line 102 of file Executor.hxx.

102 { _keepGoingOnFail=newVal; }
void Executor::setListOfBreakPoints ( std::list< std::string >  listOfBreakPoints)

define a list of nodes names as breakpoints in the graph

Definition at line 492 of file Executor.cxx.

References _isRunningunderExternalControl, _listOfBreakPoints, _mutexForSchedulerUpdate, and DEBTRACE.

493 {
494  DEBTRACE("Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
495  { // --- Critical section
498  _listOfBreakPoints = listOfBreakPoints;
499  } // --- End of critical section
500 }
bool Executor::setStepsToExecute ( std::list< std::string >  listToExecute)

Define a subset of task to execute in step by step mode.

Behaviour is unpredictable if the list is not a subset of the list given by Executor::getTasksToLoad in the current step. If some nodes must run in parallel, they must stay together in the list.

Definition at line 546 of file Executor.cxx.

References _executorState, _isRunningunderExternalControl, _mainSched, _mutexForSchedulerUpdate, _tasks, _tasksSave, DEBTRACE, YACS::FINISHED, YACS::ENGINE::Scheduler::getTaskName(), YACS::INITIALISED, YACS::NOTYETINITIALIZED, YACS::PAUSED, PMMLBasicsTestLauncher::ret, YACS::RUNNING, YACS::STOPPED, and YACS::WAITINGTASKS.

547 {
548  DEBTRACE("Executor::setStepsToExecute(std::list<std::string> listToExecute)");
549  bool ret = true;
550  vector<Task *>::iterator iter;
551  vector<Task *> restrictedTasks;
552  { // --- Critical section
555  switch (_executorState)
556  {
557  case YACS::WAITINGTASKS:
558  case YACS::PAUSED:
559  {
560  for (iter=_tasksSave.begin(); iter!=_tasksSave.end(); iter++)
561  {
562  string readyNode = _mainSched->getTaskName(*iter);
563  if (find(listToExecute.begin(), listToExecute.end(), readyNode)
564  != listToExecute.end())
565  {
566  restrictedTasks.push_back(*iter);
567  DEBTRACE("node to execute " << readyNode);
568  }
569  }
570  _tasks.clear();
571  for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
572  {
573  _tasks.push_back(*iter);
574  }
575  break;
576  }
578  case YACS::INITIALISED:
579  case YACS::RUNNING:
580  case YACS::FINISHED:
581  case YACS::STOPPED:
582  default:
583  {
584  break;
585  }
586  }
587  } // --- End of critical section
588 
589  _tasks.clear();
590  for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
591  {
592  _tasks.push_back(*iter);
593  }
594  for (iter=_tasks.begin(); iter!=_tasks.end(); iter++)
595  {
596  string readyNode = _mainSched->getTaskName(*iter);
597  DEBTRACE("selected node to execute " << readyNode);
598  }
599 
600  return ret;
601 }
void Executor::setStopOnError ( bool  dumpRequested = false,
std::string  xmlFile = "" 
)

ask to stop execution on the first node found in error

Parameters
dumpRequestedproduce a state dump when an error is found
xmlFilename of file used for state dump

Definition at line 400 of file Executor.cxx.

References _dumpErrorFile, _dumpOnErrorRequested, _mutexForSchedulerUpdate, _stopOnErrorRequested, and DEBTRACE.

Referenced by main().

401 {
402  { // --- Critical section
404  _dumpErrorFile=xmlFile;
406  _dumpOnErrorRequested = dumpRequested;
407  if (dumpRequested && xmlFile.empty())
408  throw YACS::Exception("dump on error requested and no filename given for dump");
409  DEBTRACE("_dumpErrorFile " << _dumpErrorFile << " " << _dumpOnErrorRequested);
410  } // --- End of critical section
411 }
void Executor::sleepWhileNoEventsFromAnyRunningTask ( )
protected

wait until a running task ends

Definition at line 1059 of file Executor.cxx.

References _condForNewTasksToPerform, _isWaitingEventsFromRunningTasks, _mutexForSchedulerUpdate, _numberOfEndedTasks, _numberOfRunningTasks, and DEBTRACE.

Referenced by RunA(), and RunB().

1060 {
1061  DEBTRACE("Executor::sleepWhileNoEventsFromAnyRunningTask()");
1062 // _semForNewTasksToPerform.wait(); //----utiliser pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
1065  {
1067  _condForNewTasksToPerform.wait(_mutexForSchedulerUpdate); // mutex released during wait
1068  }
1070  DEBTRACE("---");
1071 }
void Executor::stopExecution ( )

stops the execution as soon as possible

Definition at line 639 of file Executor.cxx.

References _isOKToEnd, resumeCurrentBreakPoint(), setExecMode(), and YACS::STEPBYSTEP.

640 {
642  //waitPause();
643  _isOKToEnd = true;
645 }
void Executor::traceExec ( Task task,
const std::string &  message,
const std::string &  placement 
)
protected

Definition at line 1274 of file Executor.cxx.

References _mainSched, _mutexForTrace, _start, _trace, YACS::ENGINE::Task::getContainer(), YACS::ENGINE::Container::getName(), and YACS::ENGINE::Scheduler::getTaskName().

Referenced by functionForTaskExecution(), launchTask(), launchTasks(), and loadTask().

1275 {
1276  string nodeName = _mainSched->getTaskName(task);
1277  Container *cont = task->getContainer();
1278  string containerName = "---";
1279  if (cont)
1280  containerName = cont->getName();
1281 
1282 #ifdef WIN32
1283  DWORD now = timeGetTime();
1284  double elapse = (now - _start)/1000.0;
1285 #else
1286  timeval now;
1287  gettimeofday(&now, NULL);
1288  double elapse = (now.tv_sec - _start.tv_sec) + double(now.tv_usec - _start.tv_usec)/1000000.0;
1289 #endif
1290  {
1292  _trace << elapse << " " << containerName << " " << placement << " " << nodeName << " " << message << endl;
1293  _trace << flush;
1294  }
1295 }
void Executor::unsetStopOnError ( )

ask to do not stop execution on nodes found in error

Definition at line 417 of file Executor.cxx.

References _mutexForSchedulerUpdate, and _stopOnErrorRequested.

418 {
419  { // --- Critical section
421  _stopOnErrorRequested=false;
422  } // --- End of critical section
423 }
void Executor::waitPause ( )

suspend pilot execution until Executor is in pause or waiting tasks completion mode.

Do nothing if execution is finished or in pause. Wait first step if Executor is running or in initialization.

Definition at line 609 of file Executor.cxx.

References _condForPilot, _executorState, _isRunningunderExternalControl, _mutexForSchedulerUpdate, DEBTRACE, YACS::FINISHED, YACS::INITIALISED, YACS::NOTYETINITIALIZED, YACS::PAUSED, YACS::RUNNING, YACS::STOPPED, and YACS::WAITINGTASKS.

610 {
611  DEBTRACE("Executor::waitPause()" << _executorState);
612  { // --- Critical section
615  switch (_executorState)
616  {
617  default:
618  case YACS::STOPPED:
619  case YACS::FINISHED:
620  case YACS::WAITINGTASKS:
621  case YACS::PAUSED:
622  {
623  break;
624  }
626  case YACS::INITIALISED:
627  case YACS::RUNNING:
628  {
629  _condForPilot.wait(_mutexForSchedulerUpdate); // wait until executor is PAUSED or WAITINGTASKS
630  break;
631  }
632  }
633  } // --- End of critical section
634  DEBTRACE("---");
635 }
void Executor::waitResume ( )
protected

in modes Step By step or with BreakPoint, wait until pilot resumes the execution

With the condition Mutex, the mutex is released atomically during the wait. Pilot calls Executor::resumeCurrentBreakPoint to resume execution. Must be called while mutex is locked.

Definition at line 811 of file Executor.cxx.

References _condForStepByStep, _mutexForSchedulerUpdate, and DEBTRACE.

Referenced by checkBreakPoints().

812 {
813  DEBTRACE("Executor::waitResume()");
814  _condForStepByStep.wait(_mutexForSchedulerUpdate); // wait until pilot calls resumeCurrentBreakPoint()
815  DEBTRACE("---");
816 }
void Executor::wakeUp ( )
protected

must be used protected by _mutexForSchedulerUpdate!

Definition at line 1086 of file Executor.cxx.

References _condForNewTasksToPerform, _isWaitingEventsFromRunningTasks, _numberOfEndedTasks, and DEBTRACE.

Referenced by functionForTaskExecution().

1087 {
1088  DEBTRACE("Executor::wakeUp() " << _isWaitingEventsFromRunningTasks);
1090  {
1092  _condForNewTasksToPerform.notify_all();
1093  }
1094  else
1096 }

Member Data Documentation

YACS::BASES::Condition YACS::ENGINE::Executor::_condForNewTasksToPerform
protected

Definition at line 62 of file Executor.hxx.

Referenced by sleepWhileNoEventsFromAnyRunningTask(), and wakeUp().

YACS::BASES::Condition YACS::ENGINE::Executor::_condForPilot
protected

Definition at line 65 of file Executor.hxx.

Referenced by checkBreakPoints(), functionForTaskExecution(), RunB(), and waitPause().

YACS::BASES::Condition YACS::ENGINE::Executor::_condForStepByStep
protected

Definition at line 64 of file Executor.hxx.

Referenced by functionForTaskExecution(), resumeCurrentBreakPoint(), and waitResume().

bool YACS::ENGINE::Executor::_DPLScopeSensitive
protected

specifies if scope DynParaLoop is active or not. False by default.

Definition at line 90 of file Executor.hxx.

std::string YACS::ENGINE::Executor::_dumpErrorFile
protected

Definition at line 87 of file Executor.hxx.

Referenced by RunB(), and setStopOnError().

bool YACS::ENGINE::Executor::_dumpOnErrorRequested
protected

Definition at line 71 of file Executor.hxx.

Referenced by Executor(), RunB(), and setStopOnError().

bool YACS::ENGINE::Executor::_errorDetected
protected

Definition at line 72 of file Executor.hxx.

Referenced by Executor(), functionForTaskExecution(), and RunB().

YACS::ExecutionMode YACS::ENGINE::Executor::_execMode
protected
std::list< YACS::BASES::Thread * > YACS::ENGINE::Executor::_groupOfAllThreadsCreated
protected

Definition at line 85 of file Executor.hxx.

Referenced by getNbOfThreads(), and ~Executor().

bool YACS::ENGINE::Executor::_isOKToEnd
protected
bool YACS::ENGINE::Executor::_isWaitingEventsFromRunningTasks
protected

Definition at line 74 of file Executor.hxx.

Referenced by RunA(), RunB(), sleepWhileNoEventsFromAnyRunningTask(), and wakeUp().

bool YACS::ENGINE::Executor::_keepGoingOnFail
protected

Definition at line 88 of file Executor.hxx.

std::list<std::string> YACS::ENGINE::Executor::_listOfBreakPoints
protected

Definition at line 81 of file Executor.hxx.

Referenced by checkBreakPoints(), and setListOfBreakPoints().

std::list<std::string> YACS::ENGINE::Executor::_listOfTasksToLoad
protected

Definition at line 82 of file Executor.hxx.

Referenced by checkBreakPoints(), and getTasksToLoad().

Scheduler* YACS::ENGINE::Executor::_mainSched
protected
int Executor::_maxThreads
static

Definition at line 123 of file Executor.hxx.

Referenced by Executor(), launchTask(), and YACS::ENGINE::Runtime::Runtime().

YACS::BASES::Mutex YACS::ENGINE::Executor::_mutexForNbOfConcurrentThreads
protected

Definition at line 61 of file Executor.hxx.

Referenced by getNbOfThreads().

YACS::BASES::Mutex YACS::ENGINE::Executor::_mutexForTrace
protected

Definition at line 67 of file Executor.hxx.

Referenced by RunB(), and traceExec().

int YACS::ENGINE::Executor::_nbOfConcurrentThreads
protected

Definition at line 60 of file Executor.hxx.

int YACS::ENGINE::Executor::_numberOfEndedTasks
protected

Definition at line 77 of file Executor.hxx.

Referenced by RunA(), RunB(), sleepWhileNoEventsFromAnyRunningTask(), and wakeUp().

int YACS::ENGINE::Executor::_numberOfRunningTasks
protected
ComposedNode* YACS::ENGINE::Executor::_root
protected

Definition at line 59 of file Executor.hxx.

Referenced by Executor(), RunA(), RunB(), saveState(), and sendEvent().

std::set<Task *> YACS::ENGINE::Executor::_runningTasks
protected

Definition at line 76 of file Executor.hxx.

Referenced by functionForTaskExecution(), launchTask(), RunA(), and RunB().

YACS::BASES::Semaphore YACS::ENGINE::Executor::_semForMaxThreads
protected

Definition at line 63 of file Executor.hxx.

Referenced by functionForTaskExecution(), and launchTask().

int YACS::ENGINE::Executor::_semThreadCnt
protected

Definition at line 78 of file Executor.hxx.

Referenced by Executor(), functionForTaskExecution(), and launchTask().

timeval YACS::ENGINE::Executor::_start
protected

Definition at line 94 of file Executor.hxx.

Referenced by RunB(), and traceExec().

bool YACS::ENGINE::Executor::_stopOnErrorRequested
protected

Definition at line 70 of file Executor.hxx.

Referenced by Executor(), functionForTaskExecution(), setStopOnError(), and unsetStopOnError().

std::vector<Task *> YACS::ENGINE::Executor::_tasks
protected

Definition at line 83 of file Executor.hxx.

Referenced by checkBreakPoints(), loadParallelTasks(), loadTasks(), RunB(), and setStepsToExecute().

std::vector<Task *> YACS::ENGINE::Executor::_tasksSave
protected

Definition at line 84 of file Executor.hxx.

Referenced by checkBreakPoints(), and setStepsToExecute().

size_t Executor::_threadStackSize
static

Definition at line 124 of file Executor.hxx.

Referenced by launchTask(), loadParallelTasks(), and YACS::ENGINE::Runtime::Runtime().

bool YACS::ENGINE::Executor::_toContinue
protected

Definition at line 68 of file Executor.hxx.

Referenced by Executor(), isNotFinished(), RunA(), and RunB().

std::ofstream YACS::ENGINE::Executor::_trace
protected

Definition at line 86 of file Executor.hxx.

Referenced by RunB(), and traceExec().


The documentation for this class was generated from the following files: