PLUGIN tcp lang: "C++" version: "1.1" date: "2024-05-01" author: "Julien BRUGUIER" maintainer: "Julien BRUGUIER " synopsis: "A fast implementation of TCP, compatible with com plugin." description: %{ This plugin allows simple and fast management of TCP devices, compatible with com plugin. .P It eases the creation of applications based on a TCP server or a TCP client. %} seealso: %{ .BR svm_plugin_com (7) for com plugin documentation. %} example: "Simple server" %{ .nf #!/usr/bin/env svm LOG PLUGIN "svmcom.so" PLUGIN "===PLUGINLIB===" -l 3 -t 2000 PLUGIN "svmstr.so" OPTION -i STR host ARGUMENT STR service PROCESS "server" CODE "main" INLINE :goto has_host :when &host INITIALISED "localhost" -> &host :label has_host :tcp.server @&host @&service 5 > "client" $"client" P tcp.scheduler :symbol client :memory STR/s :com.read @&P com.line -> &s :shutdown :unless &s INITIALISED :str.replace @&s ALL CONST str.pattern "\n" => "" :str.join "<" @&s ">\n" -> &s :com.write @&P @&s END SCHEDULER tcp.scheduler MEMORY host service END .fi %} example: "Simple client" %{ .nf #!/usr/bin/env svm LOG PLUGIN "svmcom.so" PLUGIN "===PLUGINLIB===" PLUGIN "svmstr.so" OPTION -i STR host ARGUMENT STR service PROCESS "client" CODE "main" INLINE :goto has_host :when &host INITIALISED "localhost" -> &host :label has_host :memory com.device/c, STR/s :com.open tcp.client @&host @&service -> &c :com.write @&c "lol\n" :com.read @&c com.line -> &s :str.replace @&s ALL CONST str.pattern "\n" => "" :com.message @&s END MEMORY host service END .fi %} includes: %{ #include #include #include #include #include #include #include #include #include #include %} code: %{ #define BUFFER_SIZE 4096 std::string strerror_local(int e) { return ::strerror(e); } std::string gai_strerror_local(int e) { return ::gai_strerror(e); } struct Memoire { Memoire() { res0 = nullptr; } ~Memoire() { if(res0) ::freeaddrinfo(res0); } struct addrinfo *res0; }; bool resolution_ip_port(const struct sockaddr* sock, const socklen_t longueur, std::string& ip, std::string& port) { char host[BUFFER_SIZE+1]; char serv[BUFFER_SIZE+1]; bool resultat=::getnameinfo(sock,longueur,host,BUFFER_SIZE,serv,BUFFER_SIZE,NI_NUMERICHOST)==0; if(resultat) { ip = host; port = serv; } return resultat; } void log_erreur(struct addrinfo *res, std::ostringstream& os, const std::string& fonction, int erreur) { std::string ip; std::string port; os << std::endl << fonction << " "; if(resolution_ip_port(res->ai_addr,res->ai_addrlen,ip,port)) { os << "(" << ip << " : " << port << ")"; } os << ": " << ::strerror_local(erreur); } template static int ouvrir(const void *svm, const int type, const std::string& ip, const std::string& port, const int tcp_file) { int fd; struct addrinfo hints, *res; int error; Memoire memoire; std::ostringstream details; details << "Unable to open TCP socket."; ::memset(&hints, 0, sizeof(hints)); hints.ai_family = PF_UNSPEC; hints.ai_socktype = type; error = ::getaddrinfo(ip.c_str(), port.c_str(), &hints, &memoire.res0); if (error) { details << std::endl << "getaddrinfo: " << ::gai_strerror_local(error); ::svm_processor_current_raise_error_internal__raw(svm,DEVICE,details.str().c_str()); } fd = -1; for (res = memoire.res0 ; res ; res = res->ai_next) { fd = ::socket(res->ai_family, res->ai_socktype, res->ai_protocol); if (fd < 0) { log_erreur(res,details,"socket",errno); continue; } if(appelle_bind) { if (::bind(fd, res->ai_addr, res->ai_addrlen) < 0) { log_erreur(res,details,"bind",errno); ::close(fd); fd = -1; continue; } } if(appelle_listen) { if(::listen(fd,tcp_file)<0) { log_erreur(res,details,"listen",errno); ::close(fd); fd = -1; continue; } } if(appelle_connect) { if (::connect(fd, res->ai_addr, res->ai_addrlen) < 0) { log_erreur(res,details,"connect",errno); ::close(fd); fd = -1; continue; } } break; } if (fd < 0) { details << std::endl; ::svm_processor_current_raise_error_internal__raw(svm,DEVICE,details.str().c_str()); } return fd; } %} test: "Simple_exchange" %{ PLUGIN "svmcom.so" PLUGIN "svmint.so" PLUGIN "svmrun.so" PLUGIN "svmstr.so" PROCESS "s" CODE "main" INLINE :memory STR/s [ "abc" ] -> s :tcp.server "localhost" "31415" 5 > "sc" $"client" s run.parallel :shutdown 0 :symbol client :memory STR/s, INT/i :com.read @&P com.size 2 -> &s :int.parse @&s -> &i :int.mul @&i @&i -> &i :com.write @&P @&i " " @&@(P/1) "\n" END AUTOTERMINATED END PROCESS "c" CODE "main" INLINE :memory com.device/c, STR/s, BLN/b :com.open tcp.client "localhost" "31415" -> &c :com.write @&c 17 :com.read @&c com.line -> &s :str.cmp @&s = "289 abc\n" -> &b :shutdown 1 :unless @&b TRUE END SCHEDULER run.parallel END %} test: "No_client" %{ PLUGIN "svmcom.so" PLUGIN "svmrun.so" PROCESS "s" CODE "main" INLINE :memory STR/s [ "abc" ] -> s :tcp.server "localhost" "31416" 5 > "sc" $"client" s run.parallel :shutdown 0 :symbol client :shutdown 1 END AUTOTERMINATED END PROCESS "S" CODE "main" INLINE :run.sleep HARD 1 END SCHEDULER run.parallel END %} USE TYPE com.device help: %{ This plugin can create com.device values to allow read, write and idle operations. %} FUNCTION com.device $tcp.client -> com.device ? help: %{ This function is used to create a com.device from a tcp.client channel. %} INTERRUPTION com.interrupted help: %{ Read operation can raise this interruption while interrupted. %} DEFINE STRUCT tcp.client %{ struct_client(const std::string& host, const std::string& port) :_host(host),_port(port) {} std::string _host; std::string _port; int _socket {0}; bool _blocking{true}; bool _read_half_closed{false}; bool _write_half_closed{false}; %} delete default: %{} help: %{ This structure is the tcp.client channel, containing data for socket management. %} FUNCTION tcp.device_client_open STR:host STR:service -> $tcp.client %{ SVM_String shost = ARGV_VALUE(0,string); SVM_String sservice = ARGV_VALUE(1,string); int fd_server = ouvrir(svm,SOCK_STREAM,RAW_STRING(shost),RAW_STRING(sservice),0); struct_client *client = new struct_client(RAW_STRING(shost),RAW_STRING(sservice)); client->_socket = fd_server; return NEW_STRUCT(tcp,client,client); %} help: %{ This function is used by :com.open instruction. .P This creates a tcp.client device used by clients to connect to servers. %} FUNCTION tcp.device_client_print $tcp.client -> STR %{ struct_client *client = ARGV_STRUCT(0,tcp,client); std::ostringstream oss; oss << "TCP client " << client->_host << ":" << client->_port << " (" << client->_socket << ")" << (client->_blocking?"":"non blocking"); return NEW_VALUE(string,NEW_STRING(oss.str())); %} help: %{ This function is used to create a string representation of the tcp.client device. %} FUNCTION tcp.device_client_read $tcp.client -> STR ? %{ struct_client *client = ARGV_STRUCT(0,tcp,client); if(client->_read_half_closed) { ERROR_INTERNAL(DEVICE,"Can not read from closed device"); } char buffer[BUFFER_SIZE+1]; ::svm_process_pause(svm); ::svm_process_interruptionnotification_enable(svm,CURRENT(process)); int read = ::recv(client->_socket,buffer,BUFFER_SIZE,0); ::svm_process_interruptionnotification_disable(svm,CURRENT(process)); ::svm_process_resume(svm); if(read<0) { if(errno==EINTR) { ERROR_EXTERNAL(com,interrupted,"Read interrupted."); } if(((errno==EAGAIN) or (errno==EWOULDBLOCK)) and not client->_blocking) { return ::svm_value_string_new__raw(svm,""); } std::ostringstream oss; oss << "Read error on TCP socket: " << ::strerror_local(errno); ERROR_INTERNAL(DEVICE,oss.str().c_str()); } if(read==0) { return NEW_NULL_VALUE(string); } return ::svm_value_string_new__buffer(svm,buffer,read); %} help: %{ This function is used by :com.read instruction on a tcp.client device. .P The function can raise a com.interrupted interruption when an interruption notification is catched. %} FUNCTION tcp.device_client_write $tcp.client STR %{ struct_client *client = ARGV_STRUCT(0,tcp,client); if(client->_write_half_closed) { ERROR_INTERNAL(DEVICE,"Can not write to closed device"); } SVM_String text = ARGV_VALUE(1,string); ssize_t written = ::send(client->_socket,text.string,text.size,MSG_NOSIGNAL); if(written<0) { std::ostringstream oss; oss << "Write error on TCP socket: " << ::strerror_local(errno); ERROR_INTERNAL(DEVICE,oss.str().c_str()); } if(static_cast(written)!=text.size) { std::ostringstream oss; oss << "Write error on TCP socket: Incomplete write to device."; ERROR_INTERNAL(DEVICE,oss.str().c_str()); } %} help: %{ This function is used by the :com.write instruction on a tcp.client device. %} FUNCTION tcp.device_client_idle $tcp.client MUTABLE INT 3 %{ struct_client *client = ARGV_STRUCT(0,tcp,client); for(size_t i=1 ; i<4 ; ++i) { SVM_Value_Integer c = ::svm_parameter_value_get(svm,argv[i]); ::svm_value_integer_set(svm,c,client->_socket); } %} help: %{ This function is used by the :com.idle instruction on a tcp.client device. %} FUNCTION tcp.device_client_command $tcp.client . * -> VALUE ? %{ struct_client *client = ARGV_STRUCT(0,tcp,client); for(SVM_Index index=1 ; index=argc) or (::svm_parameter_type_is_marker(svm,argv[index])==FALSE)) { ERROR_INTERNAL(FAILURE,"Invalid close command"); } SVM_String rdirection = ::svm_parameter_marker_get(svm,argv[index]); auto direction = RAW_STRING(rdirection); if(direction==">") { ::shutdown(client->_socket,SHUT_WR); client->_write_half_closed = true; } else if(direction=="<") { ::shutdown(client->_socket,SHUT_RD); client->_read_half_closed = true; } else { ERROR_INTERNAL(FAILURE,"Invalid close command"); } } else if(command=="LINGER") { ++index; if(index>=argc) { ERROR_INTERNAL(FAILURE,"Invalid linger command"); } linger linger; linger.l_onoff = 0; linger.l_linger = 0; if(::svm_parameter_type_is_value(svm,argv[index])==TRUE) { SVM_Value rvalue = ::svm_parameter_value_get(svm,argv[index]); if(::svm_value_type_is_integer(svm,rvalue)==FALSE) { ERROR_INTERNAL(FAILURE,"Invalid linger command"); } auto l = ::svm_value_integer_get(svm,rvalue); if(l<0) { ERROR_INTERNAL(FAILURE,"Invalid linger command"); } linger.l_onoff = 1; linger.l_linger = l; } else if(::svm_parameter_type_is_keyword(svm,argv[index])==TRUE) { SVM_String roff = ::svm_parameter_keyword_get(svm,argv[index]); if(RAW_STRING(roff)!="OFF") { ERROR_INTERNAL(FAILURE,"Invalid linger command"); } } else { ERROR_INTERNAL(FAILURE,"Invalid linger command"); } if(::setsockopt(client->_socket,SOL_SOCKET,SO_LINGER,&linger,sizeof(linger))) { std::ostringstream oss; oss << "Unable to set linger mode on TCP socket." << std::endl << ::strerror_local(errno) << std::endl; ERROR_INTERNAL(DEVICE,oss.str().c_str()); } } else if(command=="BLOCKING") { ++index; if(index>=argc) { ERROR_INTERNAL(FAILURE,"Invalid blocking command"); } if(::svm_parameter_type_is_value(svm,argv[index])==FALSE) { ERROR_INTERNAL(FAILURE,"Invalid blocking command"); } SVM_Value rtrigger = ::svm_parameter_value_get(svm,argv[index]); if(::svm_value_type_is_boolean(svm,rtrigger)==FALSE) { ERROR_INTERNAL(FAILURE,"Invalid blocking command"); } SVM_Boolean trigger = ::svm_value_boolean_get(svm,rtrigger); client->_blocking=trigger==TRUE; if(client->_blocking) { int flags = ::fcntl(client->_socket, F_GETFL, 0); ::fcntl(client->_socket, F_SETFL, flags bitand (compl O_NONBLOCK)); } else { int flags = ::fcntl(client->_socket, F_GETFL, 0); ::fcntl(client->_socket, F_SETFL, flags bitor O_NONBLOCK); } } else if(command=="NODELAY") { ++index; if(index>=argc) { ERROR_INTERNAL(FAILURE,"Invalid nodelay command"); } if(::svm_parameter_type_is_value(svm,argv[index])==FALSE) { ERROR_INTERNAL(FAILURE,"Invalid nodelay command"); } SVM_Value rtrigger = ::svm_parameter_value_get(svm,argv[index]); if(::svm_value_type_is_boolean(svm,rtrigger)==FALSE) { ERROR_INTERNAL(FAILURE,"Invalid nodelay command"); } SVM_Boolean trigger = ::svm_value_boolean_get(svm,rtrigger); int itrigger = trigger?1:0; ::setsockopt( client->_socket, IPPROTO_TCP, TCP_NODELAY, (void *)&itrigger, sizeof(itrigger)); } else if(command=="REMOTE") { ++index; if(index>=argc) { ERROR_INTERNAL(FAILURE,"Invalid remote command"); } if(::svm_parameter_type_is_keyword(svm,argv[index])==FALSE) { ERROR_INTERNAL(FAILURE,"Invalid remote command"); } SVM_String rattr = ::svm_parameter_keyword_get(svm,argv[index]); auto attr = RAW_STRING(rattr); if(attr=="IP") { return NEW_VALUE(string,NEW_STRING(client->_host)); } else if(attr=="PORT") { return NEW_VALUE(string,NEW_STRING(client->_port)); } else { ERROR_INTERNAL(FAILURE,"Invalid remote command"); } } else { ERROR_INTERNAL(FAILURE,"Invalid command name"); } } %} help: %{ This function is used by the :com.command instruction on a tcp.client device. .P It supports these commands: - 'CLOSE' [ < > ] for socket half closure, - 'LINGER' [ 'OFF' INT ] for TCP linger mode (OFF to disable, an integer to enable), - 'BLOCKING' BLN for socket in blocking or non-blocking mode, - 'NODELAY' BLN for TCP Nagle disabling or enabling, - 'REMOTE' [ 'IP' 'PORT' ] to get information on the remote peer of a tcp.client device. %} FUNCTION tcp.device_client_close $tcp.client -> BLN %{ struct_client *client = ARGV_STRUCT(0,tcp,client); ::shutdown(client->_socket,SHUT_RDWR); ::close(client->_socket); return NEW_VALUE(boolean,TRUE); %} help: %{ This function is used by the com.device destructor on a tcp.client device. %} WAITING INSTRUCTION tcp.server STR:host STR:service INT:queue > STR:process_name SYM:code PTR:memory PEP:scheduler PEP:sequencer ? %{ SVM_String shost = ARGV_VALUE(0,string); SVM_String sservice = ARGV_VALUE(1,string); long long int client_queue_size = ARGV_VALUE(2,integer); if(client_queue_size<1) { ERROR_INTERNAL(FAILURE,"Invalid client queue size"); } SVM_String sclient = ARGV_VALUE(4,string); SVM_Value_Symbol code = ::svm_parameter_value_get(svm,argv[5]); SVM_Value_Pointer memory = ::svm_parameter_value_get(svm,argv[6]); SVM_Value_PluginEntryPoint scheduler = ::svm_parameter_value_get(svm,argv[7]); SVM_Value_PluginEntryPoint sequencer = nullptr; if(argc>8) { sequencer = ::svm_parameter_value_get(svm,argv[8]); } int fd_server = ouvrir(svm,SOCK_STREAM,RAW_STRING(shost),RAW_STRING(sservice),client_queue_size); for(;;) { errno=0; ::svm_process_interruptionnotification_enable(svm,CURRENT(process)); struct sockaddr sockclient; socklen_t l = sizeof(sockclient); int fd_client = ::accept(fd_server,&sockclient,&l); ::svm_process_interruptionnotification_disable(svm,CURRENT(process)); if(fd_client<0) { if(errno==EINTR) { ::shutdown(fd_server,SHUT_RDWR); ::close(fd_server); RETURN; } else { std::ostringstream oss; oss << "Error raised while waiting for client:" << ::strerror_local(errno); ERROR_INTERNAL(DEVICE,oss.str().c_str()); } } std::string remote_host; std::string remote_port; resolution_ip_port(&sockclient,l,remote_host,remote_port); struct_client *rclient = new struct_client(remote_host,remote_port); rclient->_socket = fd_client; SVM_Structure client = NEW_STRUCT(tcp,client,rclient); SVM_Parameter *create_params = ::svm_parameter_array_new(svm,1); create_params[0] = ::svm_parameter_structure_new(svm,client); SVM_Value_Plugin device = ::svm_function_call(svm,CONST_PEP(com,device),1,create_params); VARIABLE_DELETE(client); ::svm_value_state_set_movable(svm,device); VARIABLE_DELETE(create_params); SVM_Kernel kernel = ::svm_kernel_new_symbol(svm,TRUE,FALSE,FALSE,nullptr,code); SVM_Value_Pointer *all_memory = ::svm_memory_pointer_list_accessible(svm,CURRENT(kernel),memory); for(SVM_Value_Pointer *it_memory=all_memory ; *it_memory ; ++it_memory) { SVM_Address address = ::svm_value_pointer_get_address(svm,*it_memory); SVM_Size size = ::svm_value_pointer_get_size(svm,*it_memory); SVM_Memory_Zone zone = ::svm_memory_zone_new(svm); ::svm_memory_zone_append_internal__raw(svm,zone,AUTOMATIC,size); ::svm_memory_allocate_address(svm,kernel,zone,address); VARIABLE_DELETE(zone); ::svm_memory_share(svm,CURRENT(kernel),*it_memory,kernel,*it_memory); } SVM_Memory_Zone zone = ::svm_memory_zone_new(svm); ::svm_memory_zone_append_external__raw(svm,zone,CONST_PEP(com,device),1); ::svm_memory_zone_append_internal__raw(svm,zone,POINTER,1); SVM_Value_Pointer pointer = ::svm_memory_allocate(svm,kernel,zone); ::svm_value_state_set_movable(svm,pointer); SVM_Address address = ::svm_value_pointer_get_address(svm,pointer); ::svm_memory_write_address(svm,kernel,address,device); ::svm_memory_write_address(svm,kernel,address+1,all_memory[0]); ::svm_processor_set_currentpointer(svm,kernel,pointer); VARIABLE_DELETE(zone); VARIABLE_DELETE(pointer); VARIABLE_DELETE(device); VARIABLE_DELETE(all_memory); SVM_Process process = ::svm_process_new__string(svm,sclient,sequencer,FALSE,kernel); VARIABLE_DELETE(kernel); SVM_Process_Lock lock = ::svm_process_ownership_lock(svm,process); SVM_Scheduler rscheduler = ::svm_scheduler_get(svm,scheduler); ::svm_scheduler_process_attach__raw(svm,rscheduler,process,0); VARIABLE_DELETE(lock); VARIABLE_DELETE(process); } %} help: %{ This instruction opens a TCP port and awaits for a TCP client to connect to the TCP server represented by this instruction. .P The instruction loops on each TCP client connection until it receives a signal to terminate. While the instruction is executed, the TCP listening port is opened. .P On each TCP connection: - it creates a process and attaches it to the specified scheduler, - initialise the memory of the kernel of the created process: - P references two addresses: the first one is the tcp.client device representing the client and the second one is the shared parameters, - the memory pointer and all its associated memory is instanciated in the new kernel, and the memory is shared between the server process and the new process. %} OPTION tcp.scheduler_run_limit -l INT help: %{ This option configures the TCP scheduler. It indicates the maximal number of processes running at the same time. %} OPTION tcp.scheduler_run_time -t INT help: %{ This option configures the TCP scheduler. It indicates the maximal time slice allocated to a running processes in milliseconds. %} SYSTEM INSTRUCTION tcp.scheduler_run_limit INT %{ auto l = ARGV_VALUE(0,integer); if(l<1) { ERROR_INTERNAL(FAILURE,"Invalid tcp.scheduler run limit"); } ::svm_scheduler_notify__raw(svm,::svm_scheduler_get(svm,CONST_PEP(tcp,scheduler)),l); %} help: %{ This instruction changes dynamically the maximal number of running process of the TCP scheduler. .P A value inferior to 1 will raise a FAILURE interruption. %} SCHEDULER tcp.scheduler %{ std::list _run; std::list _ready; std::list _waiting; std::list _other; std::atomic_size_t _scheduler_run_limit{1}; std::atomic_size_t _scheduler_run_time{1000}; static void list_clean(const void *svm, std::list& liste) { for(auto& p:liste) { VARIABLE_DELETE(p); } } static bool process_removal(const void *svm, std::list& liste, const SVM_Process processus) { auto it = std::find(liste.begin(),liste.end(),processus); if(it==liste.end()) return false; liste.erase(it); return true; } static void list_display(const void *svm,std::ostringstream& oss, const std::list& liste) { for(auto& p: liste) { SVM_String s = ::svm_process_print(svm,p); oss << " " << RAW_STRING(s) << std::endl; } } %} create default: %{} delete default: %{ scheduler_scheduler::list_clean(svm,object->_run); scheduler_scheduler::list_clean(svm,object->_ready); scheduler_scheduler::list_clean(svm,object->_waiting); scheduler_scheduler::list_clean(svm,object->_other); %} schedule object: %{ bool deleted = false; deleted = scheduler_scheduler::process_removal(svm,object->_run,process); if(not deleted) deleted = scheduler_scheduler::process_removal(svm,object->_ready,process); if(not deleted) deleted = scheduler_scheduler::process_removal(svm,object->_waiting,process); if(not deleted) deleted = scheduler_scheduler::process_removal(svm,object->_other,process); switch(state) { case RUNNING: object->_run.push_back(process); break; case SUSPENDED: case CONTINUE: object->_ready.push_back(process); break; case WAITING: object->_waiting.push_back(process); break; default: object->_other.push_back(process); break; } while(object->_run.size()_scheduler_run_limit) { if(object->_ready.empty()) break; SVM_Process p = object->_ready.front(); object->_run.push_back(p); object->_ready.pop_front(); ::svm_process_run__raw(svm,p,0); } return object->_scheduler_run_time.load(); %} notification object: %{ if(type == NOTIFICATION) { if(parameter>0) { if(parameter_scheduler_run_limit) { size_t suspend = object->_scheduler_run_limit-parameter; for(auto& p:object->_run) { ::svm_process_suspend(svm,p); if(--suspend==0) break; } } if(parameter>object->_scheduler_run_limit) { size_t run = parameter-object->_scheduler_run_limit; for(auto& p:object->_ready) { ::svm_process_run__raw(svm,p,0); if(--run==0) break; } } object->_scheduler_run_limit = parameter; } } if(type == TIMER and not object->_run.empty() and not object->_ready.empty()) { ::svm_process_suspend(svm,object->_run.front()); } return object->_scheduler_run_time.load(); %} attach object: %{ object->_other.push_back(process); VARIABLE_GLOBAL(process); return TRUE; %} detach object: %{ bool deleted = false; deleted = scheduler_scheduler::process_removal(svm,object->_run,process); if(not deleted) deleted = scheduler_scheduler::process_removal(svm,object->_ready,process); if(not deleted) deleted = scheduler_scheduler::process_removal(svm,object->_waiting,process); if(not deleted) deleted = scheduler_scheduler::process_removal(svm,object->_other,process); if(not deleted) return FALSE; VARIABLE_LOCAL(process); return TRUE; %} print object: %{ std::ostringstream oss; oss << "Executed processes:" << std::endl; scheduler_scheduler::list_display(svm,oss,object->_run); oss << "Ready processes:" << std::endl; scheduler_scheduler::list_display(svm,oss,object->_ready); oss << "Waiting processes:" << std::endl; scheduler_scheduler::list_display(svm,oss,object->_waiting); oss << "Other processes:" << std::endl; scheduler_scheduler::list_display(svm,oss,object->_other); return NEW_STRING(oss.str()); %} help: %{ This scheduler allows the parallel run of a maximum of processes, using a round robin policy when more processes require execution. Waiting processes are not counted in the process pool to run, letting them wait for external events. .P By default, the limit of parallel running processes is 1, and the time slice for process run is set to 1000ms. Theses values can be changed by plugin options, and the parallel run limit can also be changed by an instruction. %}