PLUGIN task lang: "C++" version: "1.0" date: "2023-02-23" author: "Julien BRUGUIER" maintainer: "Julien BRUGUIER " synopsis: "A simple tasking plugin to exploit parallelisation in the SVM." description: %{ This plugin enables the support of tasks. A task is a process having a specific identifier allowing it to exchange data with other tasks. .P The plugin uses an event queue to exchange data between tasks. %} example: "Parallel Fibonacci generation" %{ .nf #!/usr/bin/env svm LOG PLUGIN "svmrun.so" PLUGIN "svmint.so" PLUGIN "svmcom.so" PLUGIN "===PLUGINLIB===" ARGUMENT INT nb PROCESS "root" CODE "main" INLINE :memory task.id/root, task.id/fib, INT/i, INT/j, INT*2/r :task.id "root" -> &root 0 -> &i :label send :task.id "fib" -> &fib :task.new @&fib $"fib" run.parallel @&i -> &j :task.send @&root => @&fib j :shift &i :goto send :when @&i IN &0*@&nb 0 -> &i :label recv :task.recv @&root r :com.message @(r/0) " => " @(r/1) :shift &i :goto recv :when @&i IN &0*@&nb :shutdown 0 :symbol fib :memory (task.id, INT)/i, (PTR,INT*2)/r :task.recv @&P SOURCE i [ , @(i/1) , ] -> r :call fib_internal r :task.send @&P => @(i/0) (&r+1)*2 :shutdown :label fib_internal :memory (PTR, INT*2)*2, BLN -> &P :int.cmp @(P/1) < 2 -> (@&P/6) :goto fib_recursion :unless @(@&P/6) TRUE @(P/1) -> (P/2) :return :label fib_recursion :int.sub @(P/1) 1 -> (@&P/1) :call fib_internal &@&P*3 :int.sub @(P/1) 2 -> (@&P/4) :call fib_internal (&@&P+3)*3 :int.add @(@&P/2) @(@&P/5) -> (P/2) :return END MEMORY nb END .fi %} includes: %{ #include %} code: %{ SVM_Event_Queue events; %} initialisation: %{ events = ::svm_event_queue_new(svm); ::svm_variable_scope_set_global(svm,events); %} finalisation: %{ ::svm_variable_scope_set_local(svm,events); %} DEFINE STRUCT task.id %{ SVM_Value_String _name; std::vector _values; %} delete default: %{ ::svm_variable_scope_set_local(svm,object->_name); for(const auto& v: object->_values) { ::svm_variable_scope_set_local(svm,v); } %} help: %{ This structure contains data attached to a task. .P The content of this structure can be read by other tasks. %} TYPE task.id %{ SVM_Event_Queue_Address _address; %} delete default: %{ ::svm_variable_scope_set_local(svm,object->_address); if(::svm_variable_scope_is_local(svm,object->_address)) { ::svm_event_queue_leave(svm,events,object->_address); } %} copy default: %{ ::svm_variable_scope_set_global(svm,object->_address); %} print object: %{ auto s = reinterpret_cast(::svm_structure_get_internal(svm,CONST_PEP(task,id),::svm_event_address_get_struct(svm,object->_address))); return ::svm_value_string_get(svm,s->_name); %} help: %{ This type is a task identifier. .P A task identifier is required to create a task. %} STRUCT task.event %{ std::vector _values; %} delete default: %{ for(const auto& v: object->_values) { ::svm_variable_scope_set_local(svm,v); } %} help: %{ This structure conveys data between tasks. %} INSTRUCTION task.id STR : name PTR : values ? -> task.id %{ SVM_Value_String name = ::svm_parameter_value_get(svm,argv[0]); SVM_Value_Pointer values = nullptr; if(argc>1) { values = ::svm_parameter_value_get(svm,argv[1]); } struct_id *sid = new struct_id; sid->_name = name; ::svm_variable_scope_set_global(svm,name); if(argc>1) { SVM_Address a = ::svm_value_pointer_get_address(svm,values); SVM_Size s = ::svm_value_pointer_get_size(svm,values); for(SVM_Address i=a ; i<(a+s) ; ++i) { SVM_Value v = ::svm_memory_read_address(svm,CURRENT(kernel),i); ::svm_variable_scope_set_global(svm,v); sid->_values.push_back(v); } } SVM_Structure id = ::svm_structure_new(svm,CONST_PEP(task,id),sid); SVM_Event_Queue_Address address = ::svm_event_address_new_struct(svm,id); ::svm_event_queue_join(svm,events,address); ::svm_variable_scope_set_shared(svm,address); ::svm_variable_scope_set_global(svm,address); return NEW_PLUGIN(task,id,new type_id { address }); %} help: %{ This instruction create a task identifier. .P A name has to be provided for the task, and optional values can be attached to the identifier. %} INSTRUCTION task.name task.id -> STR %{ auto id = ARGV_PLUGIN(0,task,id); auto s = reinterpret_cast(::svm_structure_get_internal(svm,CONST_PEP(task,id),::svm_event_address_get_struct(svm,id->_address))); return s->_name; %} help: "This instruction retrieves the task name from a task identifier" INSTRUCTION task.values task.id -> PTR %{ auto id = ARGV_PLUGIN(0,task,id); auto s = reinterpret_cast(::svm_structure_get_internal(svm,CONST_PEP(task,id),::svm_event_address_get_struct(svm,id->_address))); SVM_Memory_Zone zone = ::svm_memory_zone_new(svm); ::svm_memory_zone_append_internal__raw(svm,zone,AUTOMATIC,s->_values.size()); SVM_Value_Pointer pointer = ::svm_memory_allocate(svm,CURRENT(kernel),zone); SVM_Address a = ::svm_value_pointer_get_address(svm,pointer); for(const auto& v: s->_values) { ::svm_memory_write_address(svm,CURRENT(kernel),a++,v); } return pointer; %} help: "This instruction retrieves the task values attached to a task identifier" WAITING INSTRUCTION task.new task.id : name SYM : code PEP : scheduler PEP : sequencer ? %{ auto id = ARGV_PLUGIN(0,task,id); SVM_Value_Symbol symbol = ::svm_parameter_value_get(svm,argv[1]); SVM_Value_PluginEntryPoint scheduler = ::svm_parameter_value_get(svm,argv[2]); SVM_Value_PluginEntryPoint sequencer = nullptr; if(argc>3) { sequencer = ::svm_parameter_value_get(svm,argv[3]); } auto name = reinterpret_cast(::svm_structure_get_internal(svm,CONST_PEP(task,id),::svm_event_address_get_struct(svm,id->_address))); SVM_Process task = ::svm_process_new_symbol(svm,name->_name,sequencer,FALSE,symbol,TRUE,FALSE,FALSE,nullptr); ::svm_process_ownership_get_local(svm,task); if(not ::svm_scheduler_process_attach__raw(svm,::svm_scheduler_get(svm,scheduler),task,0)) { ERROR_INTERNAL(FAILURE,"Task rejected by scheduler"); } SVM_Kernel kernel = ::svm_process_kernel_get_current(svm,task); SVM_Memory_Zone zone = ::svm_memory_zone_new(svm); ::svm_memory_zone_append_external__raw(svm,zone,CONST_PEP(task,id),1); SVM_Value_Pointer pointer = ::svm_memory_allocate(svm,kernel,zone); ::svm_processor_set_currentpointer(svm,kernel,pointer); ::svm_memory_write_address(svm,kernel,::svm_value_pointer_get_address(svm,pointer),::svm_value_copy(svm,::svm_parameter_value_get(svm,argv[0]))); %} help: %{ This instruction creates a task. .P A process is created by this instruction and attached to the scheduler given as parameter. An optional sequencer can be provided. The code executed by the task is specified with the symbol. .P The task uses the task identifier to get its name and identity. This identifer is accessible within the task at the address .nf &P .fi .P This instruction can raise a FAILURE interruption when the sequencer does not exist or the scheduler does not exist or refuse the process. %} WAITING INSTRUCTION task.recv task.id : destination 'SOURCE' ? PTR : values ? -> PTR ? %{ auto destination = ARGV_PLUGIN(0,task,id); bool reply_source = false; SVM_Value_Pointer values = nullptr; if(argc>1) { SVM_Size c = 1; if(::svm_parameter_type_is_keyword(svm,argv[1])) { reply_source = true; ++c; } if(argc>c) { values = ::svm_parameter_value_get(svm,argv[c]); } } SVM_Event_Queue_Address source; SVM_Structure event; ::svm_process_interruptionnotification_enable(svm,CURRENT(process)); SVM_Boolean received = ::svm_event_queue_pull(svm,events,destination->_address,&source,&event,TRUE); ::svm_process_interruptionnotification_disable(svm,CURRENT(process)); if(not received) { return NEW_NULL_VALUE(pointer); } auto s = reinterpret_cast(::svm_structure_get_internal(svm,CONST_PEP(task,id),::svm_event_address_get_struct(svm,source))); auto e = reinterpret_cast(::svm_structure_get_internal(svm,CONST_PEP(task,event),event)); SVM_Size re = (e->_values.size()+(reply_source?1:0)); auto rid = new type_id { source }; ::svm_variable_scope_set_global(svm,source); SVM_Value_Plugin id = NEW_PLUGIN(task,id,rid); if(values) { SVM_Size vs = ::svm_value_pointer_get_size(svm,values); if(vs!=re) { ERROR_INTERNAL(MEMORY,"Invalid pointer size"); } SVM_Address a = ::svm_value_pointer_get_address(svm,values); if(reply_source) { if(not ::svm_memory_address_is_writable(svm,CURRENT(kernel),a++,id)) { ERROR_INTERNAL(MEMORY,"Unable to write task id"); } } for(const auto& v:e->_values) { if(not ::svm_memory_address_is_writable(svm,CURRENT(kernel),a,v)) { std::ostringstream oss; oss << "Unable to write value at address &" << a; ERROR_INTERNAL(MEMORY,oss.str().c_str()); } ++a; } } else { SVM_Memory_Zone zone = ::svm_memory_zone_new(svm); ::svm_memory_zone_append_internal__raw(svm,zone,AUTOMATIC,re); values = ::svm_memory_allocate(svm,CURRENT(kernel),zone); } SVM_Address wa = ::svm_value_pointer_get_address(svm,values); if(reply_source) { ::svm_memory_write_address(svm,CURRENT(kernel),wa++,id); } for(const auto& v:e->_values) { ::svm_memory_write_address(svm,CURRENT(kernel),wa++,v); } return values; %} help: %{ This instruction allows a task to retrieve an event from another task. The instruction will wait until an event is received, or when interrupted. .P The specified task identifier indicates on which address the task should listen to events. .P When the SOURCE keyword is present, the first address of the result will contain the task identifier of the task sending the event. .P When the optional pointer is present, it is used to store the received event (and the optional task identifier). The instruction raises a MEMORY interrupted when the received data can not be stored into the given pointer (the data are lost in this case). The memory is allocated otherwise. .P The instruction returns a null pointer when interrupted, or the pointer containing the received data. %} INSTRUCTION task.send task.id : source => task.id : destination PTR : values %{ auto source = ARGV_PLUGIN(0,task,id); auto destination = ARGV_PLUGIN(2,task,id); SVM_Value_Pointer values = ::svm_parameter_value_get(svm,argv[3]); SVM_Address a = ::svm_value_pointer_get_address(svm,values); SVM_Size s = ::svm_value_pointer_get_size(svm,values); for(SVM_Size i=a ; i<(a+s) ; ++i) { if(not ::svm_memory_address_is_initialised(svm,CURRENT(kernel),i)) { std::ostringstream oss; oss << "Unable to read address &" << i; ERROR_INTERNAL(MEMORY,oss.str().c_str()); } } auto e = new struct_event; a = ::svm_value_pointer_get_address(svm,values); s = ::svm_value_pointer_get_size(svm,values); for(SVM_Size i=a ; i<(a+s) ; ++i) { SVM_Value v = ::svm_memory_read_address(svm,CURRENT(kernel),i); ::svm_variable_scope_set_global(svm,v); e->_values.push_back(v); ::svm_memory_write_address(svm,CURRENT(kernel),i,::svm_value_automatic_new_null(svm)); } SVM_Structure event = NEW_STRUCT(task,event,e); ::svm_event_queue_push(svm,events,destination->_address,source->_address,event); %} help: %{ This instruction allows a task to send an event to another task. .P The source task identifier can be retrieved by the receiver. .P The destination task identifier specifies to which task the event shall be sent. .P The values are specified by a pointer. If one address of the pointer is not initialised, a MEMORY interrupted is raised. The values are removed from the memory (the addresses remain defined but null). %}