PLUGIN tcp lang: "C++" version: "1.0" date: "2024-05-01" author: "Julien BRUGUIER" maintainer: "Julien BRUGUIER " synopsis: "A fast implementation of TCP, compatible with com plugin." description: %{ This plugin allows simple and fast management of TCP devices, compatible with com plugin. .P It eases the creation of applications based on a TCP server or a TCP client. %} seealso: %{ .BR svm_plugin_com (7) for com plugin documentation. %} example: "Simple server" %{ .nf #!/usr/bin/env svm LOG PLUGIN "svmcom.so" PLUGIN "===PLUGINLIB===" PLUGIN "svmrun.so" PLUGIN "svmstr.so" OPTION -i STR host ARGUMENT STR service PROCESS "server" CODE "main" INLINE :goto has_host :when &host INITIALISED "localhost" -> &host :label has_host :tcp.server @&host @&service 5 > "client" $"client" P run.parallel :symbol client :memory STR/s :com.read @&P com.line -> &s :shutdown :unless &s INITIALISED :str.replace @&s ALL CONST str.pattern "\n" => "" :str.join "<" @&s ">\n" -> &s :com.write @&P @&s END MEMORY host service END .fi %} example: "Simple client" %{ .nf #!/usr/bin/env svm LOG PLUGIN "svmcom.so" PLUGIN "===PLUGINLIB===" PLUGIN "svmstr.so" OPTION -i STR host ARGUMENT STR service PROCESS "client" CODE "main" INLINE :goto has_host :when &host INITIALISED "localhost" -> &host :label has_host :memory com.device/c, STR/s :com.open tcp.client @&host @&service -> &c :com.write @&c "lol\n" :com.read @&c com.line -> &s :str.replace @&s ALL CONST str.pattern "\n" => "" :com.message @&s END MEMORY host service END .fi %} includes: %{ #include #include #include #include #include #include #include %} code: %{ #define BUFFER_SIZE 4096 std::string strerror_local(int e) { return ::strerror(e); } std::string gai_strerror_local(int e) { return ::gai_strerror(e); } struct Memoire { Memoire() { res0 = nullptr; } ~Memoire() { if(res0) ::freeaddrinfo(res0); } struct addrinfo *res0; }; bool resolution_ip_port(const struct sockaddr* sock, const socklen_t longueur, std::string& ip, std::string& port) { char host[BUFFER_SIZE+1]; char serv[BUFFER_SIZE+1]; bool resultat=::getnameinfo(sock,longueur,host,BUFFER_SIZE,serv,BUFFER_SIZE,NI_NUMERICHOST)==0; if(resultat) { ip = host; port = serv; } return resultat; } void log_erreur(struct addrinfo *res, std::ostringstream& os, const std::string& fonction, int erreur) { std::string ip; std::string port; os << std::endl << fonction << " "; if(resolution_ip_port(res->ai_addr,res->ai_addrlen,ip,port)) { os << "(" << ip << " : " << port << ")"; } os << ": " << ::strerror_local(erreur); } template static int ouvrir(const void *svm, const int type, const std::string& ip, const std::string& port, const int tcp_file) { int fd; struct addrinfo hints, *res; int error; Memoire memoire; std::ostringstream details; details << "Unable to open TCP socket."; ::memset(&hints, 0, sizeof(hints)); hints.ai_family = PF_UNSPEC; hints.ai_socktype = type; error = ::getaddrinfo(ip.c_str(), port.c_str(), &hints, &memoire.res0); if (error) { details << std::endl << "getaddrinfo: " << ::gai_strerror_local(error); ::svm_processor_current_raise_error_internal__raw(svm,DEVICE,details.str().c_str()); } fd = -1; for (res = memoire.res0 ; res ; res = res->ai_next) { fd = ::socket(res->ai_family, res->ai_socktype, res->ai_protocol); if (fd < 0) { log_erreur(res,details,"socket",errno); continue; } if(appelle_bind) { if (::bind(fd, res->ai_addr, res->ai_addrlen) < 0) { log_erreur(res,details,"bind",errno); ::close(fd); fd = -1; continue; } } if(appelle_listen) { if(::listen(fd,tcp_file)<0) { log_erreur(res,details,"listen",errno); ::close(fd); fd = -1; continue; } } if(appelle_connect) { if (::connect(fd, res->ai_addr, res->ai_addrlen) < 0) { log_erreur(res,details,"connect",errno); ::close(fd); fd = -1; continue; } } break; } if (fd < 0) { details << std::endl; ::svm_processor_current_raise_error_internal__raw(svm,DEVICE,details.str().c_str()); } return fd; } %} test: "Simple_exchange" %{ PLUGIN "svmcom.so" PLUGIN "svmint.so" PLUGIN "svmrun.so" PLUGIN "svmstr.so" PROCESS "s" CODE "main" INLINE :memory STR/s [ "abc" ] -> s :tcp.server "localhost" "31415" 5 > "sc" $"client" s run.parallel :shutdown 0 :symbol client :memory STR/s, INT/i :com.read @&P com.size 2 -> &s :int.parse @&s -> &i :int.mul @&i @&i -> &i :com.write @&P @&i " " @&@(P/1) "\n" END AUTOTERMINATED END PROCESS "c" CODE "main" INLINE :memory com.device/c, STR/s, BLN/b :com.open tcp.client "localhost" "31415" -> &c :com.write @&c 17 :com.read @&c com.line -> &s :str.cmp @&s = "289 abc\n" -> &b :shutdown 1 :unless @&b TRUE END SCHEDULER run.parallel END %} test: "No_client" %{ PLUGIN "svmcom.so" PLUGIN "svmrun.so" PROCESS "s" CODE "main" INLINE :memory STR/s [ "abc" ] -> s :tcp.server "localhost" "31415" 5 > "sc" $"client" s run.parallel :shutdown 0 :symbol client :shutdown 1 END AUTOTERMINATED END PROCESS "S" CODE "main" INLINE :run.sleep HARD 1 END SCHEDULER run.parallel END %} USE TYPE com.device help: %{ This plugin can create com.device values to allow read, write and idle operations. %} FUNCTION com.device $tcp.client -> com.device ? help: %{ This function is used to create a com.device from a tcp.client channel. %} INTERRUPTION com.interrupted help: %{ Read operation can raise this interruption while interrupted. %} DEFINE STRUCT tcp.client %{ struct_client(const std::string& host, const std::string& port) :_host(host),_port(port) {} std::string _host; std::string _port; int _socket {0}; bool _blocking{true}; bool _read_half_closed{false}; bool _write_half_closed{false}; %} delete default: %{} help: %{ This structure is the tcp.client channel, containing data for socket management. %} FUNCTION tcp.device_client_open STR:host STR:service -> $tcp.client %{ SVM_String shost = ARGV_VALUE(0,string); SVM_String sservice = ARGV_VALUE(1,string); int fd_server = ouvrir(svm,SOCK_STREAM,RAW_STRING(shost),RAW_STRING(sservice),0); struct_client *client = new struct_client(RAW_STRING(shost),RAW_STRING(sservice)); client->_socket = fd_server; return NEW_STRUCT(tcp,client,client); %} help: %{ This function is used by :com.open instruction. .P This creates a tcp.client device used by clients to connect to servers. %} FUNCTION tcp.device_client_print $tcp.client -> STR %{ struct_client *client = ARGV_STRUCT(0,tcp,client); std::ostringstream oss; oss << "TCP client " << client->_host << ":" << client->_port << " (" << client->_socket << ")" << (client->_blocking?"":"non blocking"); return NEW_VALUE(string,NEW_STRING(oss.str())); %} help: %{ This function is used to create a string representation of the tcp.client device. %} FUNCTION tcp.device_client_read $tcp.client -> STR ? %{ struct_client *client = ARGV_STRUCT(0,tcp,client); if(client->_read_half_closed) { ERROR_INTERNAL(DEVICE,"Can not read from closed device"); } char buffer[BUFFER_SIZE+1]; ::svm_process_pause(svm,CURRENT(process)); ::svm_process_interruptionnotification_enable(svm,CURRENT(process)); int read = ::recv(client->_socket,buffer,BUFFER_SIZE,0); ::svm_process_interruptionnotification_disable(svm,CURRENT(process)); ::svm_process_resume(svm,CURRENT(process)); if(read<0) { if(errno==EINTR) { ERROR_EXTERNAL(com,interrupted,"Read interrupted."); } if(((errno==EAGAIN) or (errno==EWOULDBLOCK)) and not client->_blocking) { return ::svm_value_string_new__raw(svm,""); } std::ostringstream oss; oss << "Read error on TCP socket: " << ::strerror_local(errno); ERROR_INTERNAL(DEVICE,oss.str().c_str()); } if(read==0) { return NEW_NULL_VALUE(string); } return ::svm_value_string_new__buffer(svm,buffer,read); %} help: %{ This function is used by :com.read instruction on a tcp.client device. .P The function can raise a com.interrupted interruption when an interruption notification is catched. %} FUNCTION tcp.device_client_write $tcp.client STR %{ struct_client *client = ARGV_STRUCT(0,tcp,client); if(client->_write_half_closed) { ERROR_INTERNAL(DEVICE,"Can not write to closed device"); } SVM_String text = ARGV_VALUE(1,string); ssize_t written = ::send(client->_socket,text.string,text.size,MSG_NOSIGNAL); if(written<0) { std::ostringstream oss; oss << "Write error on TCP socket: " << ::strerror_local(errno); ERROR_INTERNAL(DEVICE,oss.str().c_str()); } if(static_cast(written)!=text.size) { std::ostringstream oss; oss << "Write error on TCP socket: Incomplete write to device."; ERROR_INTERNAL(DEVICE,oss.str().c_str()); } %} help: %{ This function is used by the :com.write instruction on a tcp.client device. %} FUNCTION tcp.device_client_idle $tcp.client MUTABLE INT 3 %{ struct_client *client = ARGV_STRUCT(0,tcp,client); for(size_t i=1 ; i<4 ; ++i) { SVM_Value_Integer c = ::svm_parameter_value_get(svm,argv[i]); ::svm_value_integer_set(svm,c,client->_socket); } %} help: %{ This function is used by the :com.idle instruction on a tcp.client device. %} FUNCTION tcp.device_client_command $tcp.client . * -> VALUE ? %{ struct_client *client = ARGV_STRUCT(0,tcp,client); for(SVM_Size index=1 ; index=argc) or (::svm_parameter_type_is_marker(svm,argv[index])==FALSE)) { ERROR_INTERNAL(FAILURE,"Invalid close command"); } SVM_String rdirection = ::svm_parameter_marker_get(svm,argv[index]); auto direction = RAW_STRING(rdirection); if(direction==">") { ::shutdown(client->_socket,SHUT_WR); client->_write_half_closed = true; } else if(direction=="<") { ::shutdown(client->_socket,SHUT_RD); client->_read_half_closed = true; } else { ERROR_INTERNAL(FAILURE,"Invalid close command"); } } else if(command=="LINGER") { ++index; if(index>=argc) { ERROR_INTERNAL(FAILURE,"Invalid linger command"); } linger linger; linger.l_onoff = 0; linger.l_linger = 0; if(::svm_parameter_type_is_value(svm,argv[index])==TRUE) { SVM_Value rvalue = ::svm_parameter_value_get(svm,argv[index]); if(::svm_value_type_is_integer(svm,rvalue)==FALSE) { ERROR_INTERNAL(FAILURE,"Invalid linger command"); } auto l = ::svm_value_integer_get(svm,rvalue); if(l<0) { ERROR_INTERNAL(FAILURE,"Invalid linger command"); } linger.l_onoff = 1; linger.l_linger = l; } else if(::svm_parameter_type_is_keyword(svm,argv[index])==TRUE) { SVM_String roff = ::svm_parameter_keyword_get(svm,argv[index]); if(RAW_STRING(roff)!="OFF") { ERROR_INTERNAL(FAILURE,"Invalid linger command"); } } else { ERROR_INTERNAL(FAILURE,"Invalid linger command"); } if(::setsockopt(client->_socket,SOL_SOCKET,SO_LINGER,&linger,sizeof(linger))) { std::ostringstream oss; oss << "Unable to set linger mode on TCP socket." << std::endl << ::strerror_local(errno) << std::endl; ERROR_INTERNAL(DEVICE,oss.str().c_str()); } } else if(command=="BLOCKING") { ++index; if(index>=argc) { ERROR_INTERNAL(FAILURE,"Invalid blocking command"); } if(::svm_parameter_type_is_value(svm,argv[index])==FALSE) { ERROR_INTERNAL(FAILURE,"Invalid blocking command"); } SVM_Value rtrigger = ::svm_parameter_value_get(svm,argv[index]); if(::svm_value_type_is_boolean(svm,rtrigger)==FALSE) { ERROR_INTERNAL(FAILURE,"Invalid blocking command"); } SVM_Boolean trigger = ::svm_value_boolean_get(svm,rtrigger); client->_blocking=trigger==TRUE; if(client->_blocking) { int flags = ::fcntl(client->_socket, F_GETFL, 0); ::fcntl(client->_socket, F_SETFL, flags bitand (compl O_NONBLOCK)); } else { int flags = ::fcntl(client->_socket, F_GETFL, 0); ::fcntl(client->_socket, F_SETFL, flags bitor O_NONBLOCK); } } else if(command=="NODELAY") { ++index; if(index>=argc) { ERROR_INTERNAL(FAILURE,"Invalid nodelay command"); } if(::svm_parameter_type_is_value(svm,argv[index])==FALSE) { ERROR_INTERNAL(FAILURE,"Invalid nodelay command"); } SVM_Value rtrigger = ::svm_parameter_value_get(svm,argv[index]); if(::svm_value_type_is_boolean(svm,rtrigger)==FALSE) { ERROR_INTERNAL(FAILURE,"Invalid nodelay command"); } SVM_Boolean trigger = ::svm_value_boolean_get(svm,rtrigger); int itrigger = trigger?1:0; ::setsockopt( client->_socket, IPPROTO_TCP, TCP_NODELAY, (void *)&itrigger, sizeof(itrigger)); } else if(command=="REMOTE") { ++index; if(index>=argc) { ERROR_INTERNAL(FAILURE,"Invalid remote command"); } if(::svm_parameter_type_is_keyword(svm,argv[index])==FALSE) { ERROR_INTERNAL(FAILURE,"Invalid remote command"); } SVM_String rattr = ::svm_parameter_keyword_get(svm,argv[index]); auto attr = RAW_STRING(rattr); if(attr=="IP") { return NEW_VALUE(string,NEW_STRING(client->_host)); } else if(attr=="PORT") { return NEW_VALUE(string,NEW_STRING(client->_port)); } else { ERROR_INTERNAL(FAILURE,"Invalid remote command"); } } else { ERROR_INTERNAL(FAILURE,"Invalid command name"); } } %} help: %{ This function is used by the :com.command instruction on a tcp.client device. .P It supports these commands: - 'CLOSE' [ < > ] for socket half closure, - 'LINGER' [ 'OFF' INT ] for TCP linger mode (OFF to disable, an integer to enable), - 'BLOCKING' BLN for socket in blocking or non-blocking mode, - 'NODELAY' BLN for TCP Nagle disabling or enabling, - 'REMOTE' [ 'IP' 'PORT' ] to get information on the remote peer of a tcp.client device. %} FUNCTION tcp.device_client_close $tcp.client -> BLN %{ struct_client *client = ARGV_STRUCT(0,tcp,client); ::shutdown(client->_socket,SHUT_RDWR); ::close(client->_socket); return NEW_VALUE(boolean,TRUE); %} help: %{ This function is used by the com.device destructor on a tcp.client device. %} WAITING INSTRUCTION tcp.server STR:host STR:service INT:queue > STR:process_name SYM:code PTR:memory PEP:scheduler PEP:sequencer ? %{ SVM_String shost = ARGV_VALUE(0,string); SVM_String sservice = ARGV_VALUE(1,string); long long int client_queue_size = ARGV_VALUE(2,integer); if(client_queue_size<1) { ERROR_INTERNAL(FAILURE,"Invalid client queue size"); } SVM_String sclient = ARGV_VALUE(4,string); SVM_Value_Symbol code = ::svm_parameter_value_get(svm,argv[5]); SVM_Value_Pointer memory = ::svm_parameter_value_get(svm,argv[6]); SVM_Value_PluginEntryPoint scheduler = ::svm_parameter_value_get(svm,argv[7]); SVM_Value_PluginEntryPoint sequencer = nullptr; if(argc>8) { sequencer = ::svm_parameter_value_get(svm,argv[8]); } int fd_server = ouvrir(svm,SOCK_STREAM,RAW_STRING(shost),RAW_STRING(sservice),client_queue_size); for(;;) { errno=0; ::svm_process_interruptionnotification_enable(svm,CURRENT(process)); struct sockaddr sockclient; socklen_t l = sizeof(sockclient); int fd_client = ::accept(fd_server,&sockclient,&l); ::svm_process_interruptionnotification_disable(svm,CURRENT(process)); if(fd_client<0) { if(errno==EINTR) { ::shutdown(fd_server,SHUT_RDWR); ::close(fd_server); RETURN; } else { std::ostringstream oss; oss << "Error raised while waiting for client:" << ::strerror_local(errno); ERROR_INTERNAL(DEVICE,oss.str().c_str()); } } std::string remote_host; std::string remote_port; resolution_ip_port(&sockclient,l,remote_host,remote_port); struct_client *rclient = new struct_client(remote_host,remote_port); rclient->_socket = fd_client; SVM_Structure client = NEW_STRUCT(tcp,client,rclient); SVM_Parameter *create_params = ::svm_parameter_array_new(svm,1); create_params[0] = ::svm_parameter_structure_new(svm,client); SVM_Value_Plugin device = ::svm_function_call(svm,CONST_PEP(com,device),1,create_params); VARIABLE_DELETE(client); ::svm_value_state_set_movable(svm,device); VARIABLE_DELETE(create_params); SVM_Process process = ::svm_process_new_symbol__string(svm,sclient,sequencer,FALSE,code,TRUE,FALSE,FALSE,nullptr); SVM_Process_Lock lock = ::svm_process_ownership_lock(svm,process); SVM_Kernel kernel = ::svm_process_kernel_get_current(svm,process); SVM_Value_Pointer *all_memory = ::svm_memory_pointer_list_accessible(svm,CURRENT(kernel),memory); for(SVM_Value_Pointer *it_memory=all_memory ; *it_memory ; ++it_memory) { SVM_Address address = ::svm_value_pointer_get_address(svm,*it_memory); SVM_Size size = ::svm_value_pointer_get_size(svm,*it_memory); SVM_Memory_Zone zone = ::svm_memory_zone_new(svm); ::svm_memory_zone_append_internal__raw(svm,zone,AUTOMATIC,size); ::svm_memory_allocate_address(svm,kernel,zone,address); VARIABLE_DELETE(zone); ::svm_memory_share(svm,CURRENT(kernel),*it_memory,kernel,*it_memory); } SVM_Memory_Zone zone = ::svm_memory_zone_new(svm); ::svm_memory_zone_append_external__raw(svm,zone,CONST_PEP(com,device),1); ::svm_memory_zone_append_internal__raw(svm,zone,POINTER,1); SVM_Value_Pointer pointer = ::svm_memory_allocate(svm,kernel,zone); ::svm_value_state_set_movable(svm,pointer); SVM_Address address = ::svm_value_pointer_get_address(svm,pointer); ::svm_memory_write_address(svm,kernel,address,device); ::svm_memory_write_address(svm,kernel,address+1,all_memory[0]); ::svm_processor_set_currentpointer(svm,kernel,pointer); VARIABLE_DELETE(zone); VARIABLE_DELETE(pointer); VARIABLE_DELETE(device); VARIABLE_DELETE(kernel); VARIABLE_DELETE(all_memory); SVM_Scheduler rscheduler = ::svm_scheduler_get(svm,scheduler); ::svm_scheduler_process_attach__raw(svm,rscheduler,process,0); VARIABLE_DELETE(lock); VARIABLE_DELETE(process); } %} help: %{ This instruction opens a TCP port and awaits for a TCP client to connect to the TCP server represented by this instruction. .P The instruction loops on each TCP client connection until it receives a signal to terminate. While the instruction is executed, the TCP listening port is opened. .P On each TCP connection: - it creates a process and attaches it to the specified scheduler, - initialise the memory of the kernel of the created process: - P references two addresses: the first one is the tcp.client device representing the client and the second one is the shared parameters, - the memory pointer and all its associated memory is instanciated in the new kernel, and the memory is shared between the server process and the new process. %}