PLUGIN tribe lang: "C++" version: "2.0" date: "2023-06-20" author: "Julien BRUGUIER" maintainer: "Julien BRUGUIER " synopsis: "An easy mechanism to let SVM processes exchange values." description: %{ This plugin creates the concept of tribe of processes. A process is part of the tribe when attached to the tribe scheduler. .P When a process is part of the tribe, it can receive values from other processes within the tribe, and send values to another process just by specifying its name as a target. %} example: "Simple exchange" %{ .nf #!/usr/bin/env svm LOG PLUGIN "svmcom.so" PLUGIN "===PLUGINLIB===" PROCESS "a" CODE "main" INLINE :debug BREAK "a" :memory STR/s, (INT, STR)/p [ 17 , "lol" ] -> p :tribe.send "b" p "" -> &s :tribe.recv @&s p :com.message @(p/0) " " @(p/1) END SCHEDULER tribe.scheduler END PROCESS "b" CODE "main" INLINE :debug BREAK "b" :memory STR/s, (INT, STR)/p "" -> &s :tribe.recv @&s p :com.message @(p/0) " " @(p/1) :shift (p/0) "mdr" -> (p/1) :tribe.send @&s p END SCHEDULER tribe.scheduler END .fi %} example: "Complete example" %{ .nf #!/usr/bin/env svm LOG PLUGIN "svmstr.so" PLUGIN "svmcom.so" PLUGIN "===PLUGINLIB===" ARGUMENT STR port PROCESS "wait" CODE "main" INLINE :interruption GLOBAL TERMINATE end :interruption !com.interrupted ignore :memory com.device/server, com.device/client :com.open com.tcp < "0.0.0.0" @&port -> &server :label loop :com.command @&server CLIENT -> &client :tribe.send "recv" client :goto loop :label ignore :return :label end END MEMORY port SCHEDULER tribe.scheduler END PROCESS "recv" CODE "main" INLINE :interruption !tribe.interrupted end :memory STR/s, (com.device, STR)/m :label loop "" -> &s :tribe.recv @&s &m*1 :com.read @(m/0) com.line -> (m/1) :tribe.send "compute" m :goto loop :label end END SCHEDULER tribe.scheduler END PROCESS "compute" CODE "main" INLINE :interruption !tribe.interrupted end :memory STR/s, (com.device, STR)/m :label loop "" -> &s :tribe.recv @&s m :str.replace @(m/1) 1 CONST str.pattern "\n" => "" :str.join "{" @(m/1) "}\n" -> (m/1) :tribe.send "send" m :goto loop :label end END SCHEDULER tribe.scheduler END PROCESS "send" CODE "main" INLINE :interruption !tribe.interrupted end :memory STR/s, (com.device, STR)/m :label loop "" -> &s :tribe.recv @&s m :com.write @(m/0) @(m/1) [ , ] -> m :goto loop :label end END SCHEDULER tribe.scheduler END .fi %} includes: %{ #include #include %} code: %{ SVM_Event_Queue _events; std::map _addresses; SVM_Lock _addresses_lock; %} initialisation: %{ _events = ::svm_event_queue_new(svm); VARIABLE_GLOBAL(_events); _addresses_lock = ::svm_lock_new(svm); VARIABLE_GLOBAL(_addresses_lock); %} finalisation: %{ VARIABLE_LOCAL(_events); for(auto& a: _addresses) { VARIABLE_LOCAL(a.second); } VARIABLE_LOCAL(_addresses_lock); %} test: "simple" %{ PLUGIN "svmint.so" PLUGIN "svmstr.so" PROCESS "a" CODE "main" INLINE :memory STR/s, (INT, STR)/p, BLN/b [ 17 , "lol" ] -> p :tribe.send "b" p "" -> &s :tribe.recv @&s p :int.cmp @(p/0) = 18 -> &b :shutdown 1 :unless @&b TRUE :str.cmp @(p/1) = "wow" -> &b :shutdown 1 :unless @&b TRUE END SCHEDULER tribe.scheduler END PROCESS "b" CODE "main" INLINE :memory STR/s, (INT, STR)/p, BLN/b "" -> &s :tribe.recv @&s p :int.cmp @(p/0) = 17 -> &b :shutdown 1 :unless @&b TRUE :str.cmp @(p/1) = "lol" -> &b :shutdown 1 :unless @&b TRUE :shift (p/0) "wow" -> (p/1) :tribe.send @&s p END SCHEDULER tribe.scheduler END %} test: "wrong_destination" %{ PROCESS "a" CODE "main" INLINE :memory STR/s, (INT, STR)/p, BLN/b [ 17 , "lol" ] -> p :interruption !tribe.wrong_destination fail :tribe.send "b" p :shutdown 1 :label fail END SCHEDULER tribe.scheduler END %} test: "wrong_source" %{ PLUGIN "svmrun.so" PROCESS "a" CODE "main" INLINE :memory STR/s, (INT, STR)/p, BLN/b [ 17 , "lol" ] -> p :interruption !tribe.wrong_source fail :tribe.send "b" p :shutdown 1 :label fail END END PROCESS "b" CODE "main" INLINE :run.sleep HARD 1 END SCHEDULER tribe.scheduler END %} test: "wrong_values_size" %{ PROCESS "a" CODE "main" INLINE :memory STR/s, (INT, STR)/p [ 17 , "lol" ] -> p :tribe.send "b" p END SCHEDULER tribe.scheduler END PROCESS "b" CODE "main" INLINE :memory STR/s, INT/p "" -> &s :interruption MEMORY fail :tribe.recv @&s p :shutdown 1 :label fail END SCHEDULER tribe.scheduler END %} test: "wrong_values_type" %{ PROCESS "a" CODE "main" INLINE :memory STR/s, (INT, STR)/p [ 17 , "lol" ] -> p :tribe.send "b" p END SCHEDULER tribe.scheduler END PROCESS "b" CODE "main" INLINE :memory STR/s, INT*2/p "" -> &s :interruption MEMORY fail :tribe.recv @&s p :shutdown 1 :label fail END SCHEDULER tribe.scheduler END %} test: "double_process" %{ PLUGIN "svmint.so" PLUGIN "svmstr.so" PROCESS "a" CODE "main" INLINE :memory STR/s, (INT, STR)/p, BLN/b, INT/i [ 17 , "lol" ] -> p :tribe.send "b" p [ 17 , "lol" ] -> p :tribe.send "b" p 0 -> &i :label check "" -> &s [ , ] -> p :tribe.recv @&s p :int.cmp @(p/0) = 18 -> &b :shutdown 1 :unless @&b TRUE :str.cmp @(p/1) = "wow" -> &b :shutdown 1 :unless @&b TRUE :shift &i :goto check :when @&i IN &0*2 END SCHEDULER tribe.scheduler END PROCESS "b" CODE "main" INLINE :memory STR/s, (INT, STR)/p, BLN/b "" -> &s :tribe.recv @&s p :int.cmp @(p/0) = 17 -> &b :shutdown 1 :unless @&b TRUE :str.cmp @(p/1) = "lol" -> &b :shutdown 1 :unless @&b TRUE :shift (p/0) "wow" -> (p/1) :tribe.send @&s p END SCHEDULER tribe.scheduler END PROCESS "b" CODE "main" INLINE :memory STR/s, (INT, STR)/p, BLN/b "" -> &s :tribe.recv @&s p :int.cmp @(p/0) = 17 -> &b :shutdown 1 :unless @&b TRUE :str.cmp @(p/1) = "lol" -> &b :shutdown 1 :unless @&b TRUE :shift (p/0) "wow" -> (p/1) :tribe.send @&s p END SCHEDULER tribe.scheduler END %} test: "external_process" %{ PLUGIN "svmint.so" PLUGIN "svmstr.so" PLUGIN "svmrun.so" PROCESS "a" CODE "main" INLINE :memory STR/s, (INT, STR)/p, BLN/b, tribe.process/tp :tribe.process -> &tp [ 17 , "lol" ] -> p :tribe.send "b" p "" -> &s [ , ] -> p :tribe.recv @&s p [ ] -> tp :int.cmp @(p/0) = 20 -> &b :shutdown 1 :unless @&b TRUE :str.cmp @(p/1) = "wow" -> &b :shutdown 1 :unless @&b TRUE END END PROCESS "b" CODE "main" INLINE :memory STR/s, (INT, STR)/p, BLN/b, STR/c "" -> &s "" -> &c :tribe.recv @&s p :shift (p/0) :tribe.send "c" p :tribe.recv @&c p :shift (p/0) :tribe.send @&s p END SCHEDULER tribe.scheduler END PROCESS "c" CODE "main" INLINE :memory STR/s, (INT, STR)/p, BLN/b, tribe.process/tp :tribe.process -> &tp "" -> &s :tribe.recv @&s p :int.cmp @(p/0) = 18 -> &b :shutdown 1 :unless @&b TRUE :str.cmp @(p/1) = "lol" -> &b :shutdown 1 :unless @&b TRUE :shift (p/0) "wow" -> (p/1) :tribe.send @&s p END SCHEDULER run.parallel END %} DEFINE INTERRUPTION tribe.wrong_source help: "This interruption is raised when values are sent from a process not being in the tribe." INTERRUPTION tribe.wrong_destination help: "This interruption is raised when values are sent to a process not being in the tribe." INTERRUPTION tribe.interrupted help: "This interruption is raised when a process within the tribe is waiting for values, and has been notified to exit before receiving values." STRUCT tribe.identity %{ explicit struct_identity(const std::string& name) :_name(name) {} std::string _name; %} delete default: %{} help: "This structure is used to convey the name of the source process when values are received by a process within the tribe." STRUCT tribe.message %{ std::vector _values; %} delete default: %{ for(auto& v:object->_values) { VARIABLE_LOCAL(v); } %} help: "This structure contains the values to be transmitted between tribe processes." code: %{ struct Tools { static void add_address(const void *svm, const std::string& process_name) { SVM_LockGuard_Write write = ::svm_lock_writeguard_new(svm,_addresses_lock,FALSE); SVM_Event_Queue_Address address = nullptr; auto it=_addresses.find(process_name); if(it!=_addresses.end()) { address = it->second; } else { SVM_Structure identity = ::svm_structure_new(svm,CONST_PEP(tribe,identity),new struct_identity(process_name)); address = ::svm_event_address_new_struct(svm,identity); ::svm_variable_scope_set_shared(svm,address); ::svm_event_queue_join(svm,_events,address); } VARIABLE_GLOBAL(address); _addresses.insert(std::make_pair(process_name,address)); } static bool remove_address(const void *svm, const std::string& process_name) { SVM_LockGuard_Write write = ::svm_lock_writeguard_new(svm,_addresses_lock,FALSE); auto it = _addresses.find(process_name); if(it==_addresses.end()) return false; SVM_Event_Queue_Address address = it->second; VARIABLE_LOCAL(address); if(::svm_variable_scope_is_local(svm,address)) { ::svm_event_queue_leave(svm,_events,address); _addresses.erase(it); } return true; } }; %} TYPE tribe.process %{ explicit type_process(const std::string& name) :_name(name) { } operator std::string () const { return _name; } std::string _name; %} delete default: %{ Tools::remove_address(svm,object->_name); %} print default: %{} help: %{ This type fakes a process having an address within the tribe. It can be used by processes external to the tribe to exchange data with processes within the tribe. .P Usage of this type should be limited to corner cases. It is usually better to have processes attached to the tribe scheduler. %} WAITING INSTRUCTION tribe.process -> tribe.process %{ SVM_String n = ::svm_process_get_name(svm,CURRENT(process)); auto rn = RAW_STRING(n); auto a = new type_process(rn); Tools::add_address(svm,rn); return NEW_PLUGIN(tribe,process,a); %} help: %{ This instruction artificially adds the current process to the tribe without attaching it to the tribe scheduler. .P This lasts until the value is deleted. %} WAITING INSTRUCTION tribe.send STR:destination_name PTR:values %{ auto destination = ARGV_VALUE(0,string); auto destination_raw = RAW_STRING(destination); auto pointer = ::svm_parameter_value_get(svm,argv[1]); auto message_raw = new struct_message(); SVM_Structure message = ::svm_structure_new(svm,CONST_PEP(tribe,message),message_raw); SVM_Address address = ::svm_value_pointer_get_address(svm,pointer); SVM_Size size = ::svm_value_pointer_get_size(svm,pointer); for(SVM_Address a=address ; a<(address+size) ; ++a) { SVM_Value value = ::svm_memory_read_address(svm,CURRENT(kernel),a); VARIABLE_GLOBAL(value); message_raw->_values.push_back(value); ::svm_memory_write_address(svm,CURRENT(kernel),a,::svm_value_automatic_new_null(svm)); } SVM_LockGuard_Read read = ::svm_lock_readguard_new(svm,_addresses_lock,TRUE); auto source = ::svm_process_get_name(svm,CURRENT(process)); auto self = _addresses.find(RAW_STRING(source)); if(self==_addresses.end()) { ERROR_EXTERNAL(tribe,wrong_source,"Source is not a process within the tribe"); } auto it = _addresses.find(destination_raw); if(it==_addresses.end()) { ERROR_EXTERNAL(tribe,wrong_destination,"Destination is not a process within the tribe"); } VARIABLE_DELETE(read); ::svm_event_queue_push(svm,_events,it->second,self->second,message); %} help: %{ This instruction sends values referenced by the pointer to the tribe member having the destination_name. .I The instruction clears the values sent, and there are no longer accessible from the sender. .P The !tribe.wrong_source interruption is raised when the current process is not part of the tribe. .P The !tribe.wrong_destination interruption is raised when the destination process is not part of the tribe. .P There is no guarranted delivery of the values to the destination tribe member. %} WAITING INSTRUCTION tribe.recv MUTABLE STR:source_name PTR:emplace ? -> PTR:values %{ auto source = ::svm_parameter_value_get(svm,argv[0]); SVM_Value_Pointer pointer = nullptr; if(argc>1) { pointer = ::svm_parameter_value_get(svm,argv[1]); } SVM_LockGuard_Read read = ::svm_lock_readguard_new(svm,_addresses_lock,TRUE); auto destination = ::svm_process_get_name(svm,CURRENT(process)); auto self = _addresses.find(RAW_STRING(destination)); if(self==_addresses.end()) { ERROR_EXTERNAL(tribe,wrong_destination,"Destination is not a process within the tribe"); } SVM_Event_Queue_Address destination_address = self->second; VARIABLE_DELETE(read); SVM_Event_Queue_Address s; SVM_Structure message; ::svm_process_interruptionnotification_enable(svm,CURRENT(process)); SVM_Boolean result = ::svm_event_queue_pull(svm,_events,destination_address,&s,&message,TRUE); ::svm_process_interruptionnotification_disable(svm,CURRENT(process)); if(not result) { ERROR_EXTERNAL(tribe,interrupted,"Receive interrupted"); } auto source_raw = reinterpret_cast(::svm_structure_get_internal(svm,CONST_PEP(tribe,identity),::svm_event_address_get_struct(svm,s))); auto message_raw = reinterpret_cast(::svm_structure_get_internal(svm,CONST_PEP(tribe,message),message)); if(not pointer) { SVM_Memory_Zone zone = ::svm_memory_zone_new(svm); ::svm_memory_zone_append_internal__raw(svm,zone,AUTOMATIC,message_raw->_values.size()); pointer = ::svm_memory_allocate(svm,CURRENT(kernel),zone); } else { SVM_Address address = ::svm_value_pointer_get_address(svm,pointer); SVM_Size size = ::svm_value_pointer_get_size(svm,pointer); if(message_raw->_values.size()!=size) { ERROR_INTERNAL(MEMORY,"Provided pointer has invalid size"); } auto it = message_raw->_values.cbegin(); for(SVM_Address a=address ; a<(address+size) ; ++a,++it) { if(not ::svm_memory_address_is_writable(svm,CURRENT(kernel),a,*it)) { ERROR_INTERNAL(MEMORY,"Provided value has invalid type"); } } } SVM_Address address2 = ::svm_value_pointer_get_address(svm,pointer); SVM_Size size2 = ::svm_value_pointer_get_size(svm,pointer); auto it2 = message_raw->_values.cbegin(); for(SVM_Address a=address2 ; a<(address2+size2) ; ++a,++it2) { ::svm_value_state_set_movable(svm,*it2); ::svm_memory_write_address(svm,CURRENT(kernel),a,*it2); } ::svm_value_string_set(svm,source,NEW_STRING(source_raw->_name)); return pointer; %} help: %{ This instruction waits for values from another tribe member. .P The name of the tribe member sending the values is written into the source_name parameter. .P When the emplace pointer is specified, values are written to this pointer or a MEMORY interruption is raised if the values can not be written in an atomic way. Otherwise, the memory is allocated. The pointer to the received values is returned. .P The !tribe.wrong_destination interruption is raised when the current process is not part of the tribe. .P If the instruction gets a notification before receiving values, a !tribe.interrupted interruption is raised. %} SCHEDULER tribe.scheduler %{ std::vector _processes; %} create default: %{} delete default: %{ for(auto& p:object->_processes) { VARIABLE_LOCAL(p); } %} schedule object: %{ if(state==SUSPENDED) { ::svm_process_run__raw(svm,process,0); } return 0; %} attach default: %{ auto process_name = RAW_STRING(::svm_process_get_name(svm,process)); Tools::add_address(svm,process_name); VARIABLE_GLOBAL(process); object->_processes.push_back(process); return TRUE; %} detach default: %{ auto process_name = RAW_STRING(::svm_process_get_name(svm,process)); if(not Tools::remove_address(svm,process_name)) { return FALSE; } for(auto p=object->_processes.begin() ; p!=object->_processes.end() ; ++p) { if(*p==process) { VARIABLE_LOCAL(*p); object->_processes.erase(p); break; } } return TRUE; %} print object: %{ std::ostringstream oss; for(auto& p:object->_processes) { oss << RAW_STRING(::svm_process_print(svm,p)) << std::endl; } return NEW_STRING(oss.str()); %} help: %{ This is the tribe scheduler. .P When a process is attached to it, this process becomes a member of the tribe and can start to exchange values with other tribe members. .P The scheduler will run every process attached to it like the run.parallel does. %}