PLUGIN serial lang: "C++" version: "1.0" date: "2022-11-01" author: "Julien BRUGUIER" maintainer: "Julien BRUGUIER " synopsis: "An efficient scheduler for tasking." description: %{ This plugin adds a scheduler to the Simple Virtual Machine dedicated to execute tasks. .P The controlled processes are executed in parallel within a given limit. .P The limit of executed processes in parallel can be tuned by a dedicated instruction. %} includes: %{ #include #include #include %} startup: %{ SVM_Value_Integer l = ::svm_plugin_get_option(svm,CONST_PEP(serial,limit)); if(not ::svm_value_state_is_null(svm,l)) { auto rl = ::svm_value_integer_get(svm,l); if(rl>0) { ::svm_scheduler_notify__raw(svm,::svm_scheduler_get(svm,CONST_PEP(serial,scheduler)),rl); } } %} DEFINE OPTION serial.limit -l INT help: "Starts serial scheduler with a process limit. Shall be above 0. When not present, serial uses 1 as a limit." SCHEDULER serial.scheduler %{ enum class List { R, S, W, O, N }; List remove(const SVM_Process p) { std::list::iterator it = std::find(_running.begin(),_running.end(),p); if(it!=_running.end()) { _running.erase(it); return List::R; } it = std::find(_suspended.begin(),_suspended.end(),p); if(it!=_suspended.end()) { _suspended.erase(it); return List::S; } it = std::find(_waiting.begin(),_waiting.end(),p); if(it!=_waiting.end()) { _waiting.erase(it); return List::W; } it = std::find(_others.begin(),_others.end(),p); if(it!=_others.end()) { _others.erase(it); return List::O; } return List::N; } void print(const void *svm, std::ostream& os) const { os << "Limit " << _limit << std::endl << "Desired running:" << std::endl; scheduler_scheduler::print(svm,_running,os); os << "Desired suspended:" << std::endl; scheduler_scheduler::print(svm,_suspended,os); os << "Desired waiting:" << std::endl; scheduler_scheduler::print(svm,_waiting,os); os << "Desired others:" << std::endl; scheduler_scheduler::print(svm,_others,os); } static void print(const void *svm, const std::list& l, std::ostream& os) { for(const auto& p: l) { SVM_String s = ::svm_process_print(svm,p); os << " " << std::string(s.string,s.size) << std::endl; } } static void local(const void *svm, std::list& l) { for(auto& p: l) { ::svm_variable_scope_set_local(svm,p); } } void command(const void *svm) { if(_running.size()>_limit) { auto it = _running.begin(); for(size_t i=0 ; i<_limit ; ++i) { ++it; } size_t s = _suspended.size(); std::move(it,_running.end(),std::back_inserter(_suspended)); _running.erase(it,_running.end()); for(auto p: _suspended) { if(s==0) { ::svm_process_suspend(svm,p); } else { --s; } } } else if(_running.size()<_limit) { auto it = _suspended.begin(); for(size_t i=_running.size() ; i<_limit ; ++i) { if(it==_suspended.end()) break; ++it; } size_t s = _running.size(); std::move(_suspended.begin(),it,std::back_inserter(_running)); _suspended.erase(_suspended.begin(),it); for(auto p: _running) { if(s==0) { ::svm_process_run__raw(svm,p,0); } else { --s; } } } } std::list _running; std::list _suspended; std::list _waiting; std::list _others; size_t _limit = 1; %} create default: %{} delete default: %{ scheduler_scheduler::local(svm,object->_running); scheduler_scheduler::local(svm,object->_suspended); scheduler_scheduler::local(svm,object->_waiting); scheduler_scheduler::local(svm,object->_running); %} schedule object: %{ auto l = object->remove(process); if(l==scheduler_scheduler::List::N) return 0; switch(state) { case RUNNING: case DEBUG: { object->_running.push_back(process); } break; case SUSPENDED: { if((l==scheduler_scheduler::List::R) or (l==scheduler_scheduler::List::W)) { object->_suspended.push_front(process); } else { object->_suspended.push_back(process); } } break; case LOCKED: case WAITING: { object->_waiting.push_back(process); } break; case ZOMBIE: case INTERRUPTED: case ERROR: { object->_others.push_back(process); } break; } object->command(svm); return 0; %} notification object: %{ if(parameter<1) { ERROR_INTERNAL(FAILURE,"Invalid process limit"); } object->_limit = parameter; object->command(svm); return 0; %} attach object: %{ ::svm_variable_scope_set_global(svm,process); object->_others.push_back(process); return TRUE; %} detach object: %{ auto l = object->remove(process); if(l==scheduler_scheduler::List::N) return FALSE; ::svm_variable_scope_set_local(svm,process); return TRUE; %} print object: %{ std::ostringstream os; object->print(svm,os); return ::svm_string_new(svm,os.str().c_str(),os.str().size()); %} help: %{ The scheduler accepts processes without any limitation. .P It will execute processes in parallel until a certain limit. This limit is 1 by default, but can be modified by the serial.limit instruction. .P When a process ends, another suspended process is executed at its place if available. When a process is put on hold (waiting or locked), another suspended process is executed at its place as well. The process put on hold will be restarted as soon as possible by the scheduler. %} SYSTEM INSTRUCTION serial.limit INT %{ auto limit = ARGV_VALUE(0,integer); if(limit<1) { ERROR_INTERNAL(FAILURE,"Serial limit shall be above 1"); } ::svm_scheduler_notify__raw(svm,::svm_scheduler_get(svm,CONST_PEP(serial,scheduler)),limit); %} help: %{ This instruction changes the limit of parallel processes executed by the serial.scheduler. .P The instruction raises a FAILURE interruption if the limit is under 1. %}