45 #define usleep(A) _sleep(A/1000)
46 #if !defined(S_ISCHR) || !defined(S_ISREG)
49 # define S_IFMT _S_IFMT
50 # define S_IFCHR _S_IFCHR
51 # define S_IFREG _S_IFREG
54 # define S_IFMT __S_IFMT
55 # define S_IFCHR __S_IFCHR
56 # define S_IFREG __S_IFREG
60 # define S_ISCHR(mode) (((mode) & S_IFMT) == S_IFCHR)
61 # define S_ISREG(mode) (((mode) & S_IFMT) == S_IFREG)
65 using namespace YACS::ENGINE;
68 using YACS::BASES::Mutex;
69 using YACS::BASES::Thread;
70 using YACS::BASES::Semaphore;
78 Executor::Executor():_nbOfConcurrentThreads(0), _semForMaxThreads(_maxThreads),_keepGoingOnFail(false),_DPLScopeSensitive(false)
114 DEBTRACE(
"Executor::RunW debug: " << debug <<
" fromScratch: " << fromScratch);
127 vector<Task *> tasks;
128 vector<Task *>::iterator iter;
149 for(iter=tasks.begin();iter!=tasks.end();iter++)
233 DEBTRACE(
"Executor::RunB debug: "<< graph->
getName() <<
" "<< debug<<
" fromScratch: "<<fromScratch);
249 string tracefile =
"traceExec_";
251 _trace.open(tracefile.c_str());
255 gettimeofday(&
_start, NULL);
282 vector<Task *>::iterator iter;
330 std::cerr <<
"Problem in Executor : no running tasks and no task to launch ?? problemCount=" << problemCount << std::endl;
333 if(problemCount > 25)
357 DEBTRACE(
"stop requested: End soon");
407 if (dumpRequested && xmlFile.empty())
408 throw YACS::Exception(
"dump on error requested and no filename given for dump");
433 DEBTRACE(
"Executor::setExecMode(YACS::ExecutionMode mode) " << mode);
451 DEBTRACE(
"Executor::resumeCurrentBreakPoint()");
474 DEBTRACE(
"Graph Execution finished or stopped !");
494 DEBTRACE(
"Executor::setListOfBreakPoints(std::list<std::string> listOfBreakPoints)");
510 DEBTRACE(
"Executor::getTasksToLoad()");
511 list<string> listOfNodesToLoad;
512 listOfNodesToLoad.clear();
535 return listOfNodesToLoad;
548 DEBTRACE(
"Executor::setStepsToExecute(std::list<std::string> listToExecute)");
550 vector<Task *>::iterator iter;
551 vector<Task *> restrictedTasks;
563 if (find(listToExecute.begin(), listToExecute.end(), readyNode)
564 != listToExecute.end())
566 restrictedTasks.push_back(*iter);
567 DEBTRACE(
"node to execute " << readyNode);
571 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
590 for (iter=restrictedTasks.begin(); iter!=restrictedTasks.end(); iter++)
597 DEBTRACE(
"selected node to execute " << readyNode);
651 DEBTRACE(
"Executor::saveState() in " << xmlFile);
662 std::cerr << ex.
what() << std::endl;
680 if (stat(filename, &buf) != 0)
682 if (!S_ISREG(buf.st_mode))
702 std::ofstream g(
"titi");
705 const char displayScript[]=
"display.sh";
707 system(
"sh display.sh");
709 system(
"dot -Tpng titi|display -delay 5");
725 DEBTRACE(
"Executor::checkBreakPoints()");
726 vector<Task *>::iterator iter;
727 bool endRequested =
false;
769 if (stop)
DEBTRACE(
"wake up from waitResume");
795 DEBTRACE(
"wake up from waitResume");
799 DEBTRACE(
"endRequested: " << endRequested);
826 DEBTRACE(
"Executor::loadTask(Task *task)");
843 std::cerr << ex.
what() << std::endl;
853 std::cerr <<
"Load failed" << std::endl;
872 for(std::vector<Task *>::const_iterator iter =
_tasks.begin(); iter !=
_tasks.end(); iter++)
878 std::vector<Thread> ths(tasks.size());
879 std::size_t ithread(0);
880 for(std::vector<Task *>::const_iterator iter =
_tasks.begin(); iter !=
_tasks.end(); iter++, ithread++)
882 DEBTRACE(
"Executor::loadParallelTasks(Task *task)");
884 args->
task = (*iter);
889 for(ithread=0;ithread<tasks.size();ithread++)
901 for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
907 (*iter)->connectService();
911 (*iter)->connected();
916 std::cerr << ex.
what() << std::endl;
919 (*iter)->disconnectService();
935 std::cerr <<
"Problem in connectService" << std::endl;
938 (*iter)->disconnectService();
955 std::set<Task*> coupledSet;
956 (*iter)->getCoupledTasks(coupledSet);
957 for (std::set<Task*>::iterator it = coupledSet.begin(); it != coupledSet.end(); ++it)
960 if(t == *iter)
continue;
984 for(vector<Task *>::const_iterator iter=tasks.begin();iter!=tasks.end();iter++)
1001 DEBTRACE(
"Executor::launchTask(Task *task)");
1012 std::set<Task*>::iterator it = tmpSet.begin();
1013 std::string status=
"running";
1014 std::set<Task*> coupledSet;
1015 while( it != tmpSet.end() )
1021 for (std::set<Task*>::iterator iter = coupledSet.begin(); iter != coupledSet.end(); ++iter)
1024 tmpSet.erase(*iter);
1026 if(status==
"running")
break;
1027 it = tmpSet.begin();
1030 if(status==
"toactivate")
1032 std::cerr <<
"WARNING: maybe you need more threads to run your schema (current value="<<
_maxThreads <<
")" << std::endl;
1033 std::cerr <<
"If it is the case, set the YACS_MAX_THREADS environment variable to a bigger value (export YACS_MAX_THREADS=xxx)" << std::endl;
1061 DEBTRACE(
"Executor::sleepWhileNoEventsFromAnyRunningTask()");
1114 DEBTRACE(
"Executor::functionForTaskLoad(void *arg)");
1139 DEBTRACE(
"Executor::functionForTaskExecution(void *arg)");
1154 Node *node(dynamic_cast<Node *>(task));
1155 ComposedNode *gfn(dynamic_cast<ComposedNode *>(sched));
1156 if(node!=0 && gfn!=0)
1169 std::cerr <<
"YACS Exception during execute" << std::endl;
1170 std::cerr << ex.
what() << std::endl;
1172 string message =
"end execution ABORT, ";
1173 message += ex.
what();
1179 std::cerr <<
"Execution has failed: unknown reason" << std::endl;
1187 DEBTRACE(
"task->disconnectService()");
1194 std::cerr <<
"disconnect has failed" << std::endl;
1234 std::cerr <<
"Error during notification" << std::endl;
1235 std::cerr << ex.
what() << std::endl;
1241 std::cerr <<
"Notification failed" << std::endl;
1246 <<
" _execMode: " << execInst->
_execMode
1278 string containerName =
"---";
1280 containerName = cont->
getName();
1283 DWORD now = timeGetTime();
1284 double elapse = (now -
_start)/1000.0;
1287 gettimeofday(&now, NULL);
1288 double elapse = (now.tv_sec -
_start.tv_sec) +
double(now.tv_usec -
_start.tv_usec)/1000000.0;
1292 _trace << elapse <<
" " << containerName <<
" " << placement <<
" " << nodeName <<
" " << message << endl;
1317 std::map<HomogeneousPoolContainer *, std::vector<Task *> >
m;
1318 for(std::vector<Task *>::const_iterator it=tsks.begin();it!=tsks.end();it++)
1335 m[contC].push_back(cur);
1338 std::vector<Task *>
ret;
1342 const std::vector<Task *>& curtsks((*it).second);
1345 ret.insert(ret.end(),curtsks.begin(),curtsks.end());
1351 std::vector<const Task *> vecOfTaskSharingSameHPContToBeRunSimutaneously;
1353 std::vector<Task *>::const_iterator it2(curtsks.begin());
1354 for(std::size_t
i=0;
i<sz && it2!=curtsks.end();
i++,it2++)
1356 vecOfTaskSharingSameHPContToBeRunSimutaneously.push_back(*it2);
1357 ret.push_back(*it2);
1359 curhpc->
allocateFor(vecOfTaskSharingSameHPContToBeRunSimutaneously);
1369 std::string placement(
"---");