#define _GNU_SOURCE #include "network.h" #include "swap.h" #include "http/http_rel.h" #include "cJSON.h" #include "tools/log/log.h" #include "tools/quit/quit.h" #include "erroprocess/erroprocess.h" #include #include #include #include #include #include #include #include #include #include #include #include static void safe_strcpy(char *dst, size_t dst_size, const char *src) { if (!src) { dst[0] = '\0'; return; } size_t len = strlen(src); if (len >= dst_size) len = dst_size - 1; memcpy(dst, src, len); dst[len] = '\0'; } /* 主解析 */ int rbt_parse_json(const char *json_text, rbt_msg *out) { memset(out, 0, sizeof(*out)); // 统一清 0,gid 天然 '\0' cJSON *root = cJSON_Parse(json_text); if (!root) return -1; /* 1. 取群号(可能没有) */ cJSON *gid = cJSON_GetObjectItemCaseSensitive(root, "group_id"); if (cJSON_IsString(gid)) safe_strcpy(out->gid, sizeof(out->gid), gid->valuestring); else if (cJSON_IsNumber(gid)) // 有些框架是数字 snprintf(out->gid, sizeof(out->gid), "%d", gid->valueint); /* 2. 用户号 */ cJSON *uid = cJSON_GetObjectItemCaseSensitive(root, "user_id"); if (cJSON_IsString(uid)) safe_strcpy(out->uid, sizeof(out->uid), uid->valuestring); else if (cJSON_IsNumber(uid)) snprintf(out->uid, sizeof(out->uid), "%d", uid->valueint); /* 3. 昵称在 sender 对象里 */ cJSON *sender = cJSON_GetObjectItemCaseSensitive(root, "sender"); if (cJSON_IsObject(sender)) { cJSON *nick = cJSON_GetObjectItemCaseSensitive(sender, "nickname"); safe_strcpy(out->nickname, sizeof(out->nickname), cJSON_IsString(nick) ? nick->valuestring : NULL); } /* 4. 原始消息 */ cJSON *raw = cJSON_GetObjectItemCaseSensitive(root, "raw_message"); safe_strcpy(out->raw_message, sizeof(out->raw_message), cJSON_IsString(raw) ? raw->valuestring : NULL); /* 5. 消息类型 */ cJSON *type = cJSON_GetObjectItemCaseSensitive(root, "message_type"); if (cJSON_IsString(type)) { if (strcmp(type->valuestring, "group") == 0) out->message_type = 'g'; else if (strcmp(type->valuestring, "private") == 0) out->message_type = 'p'; /* else 保持 0 */ } cJSON_Delete(root); return 0; // 成功 } ssize_t read_req(int fd, void *buf) { // TODO 修改读取任务函数 ssize_t n = read(fd, buf, NET_MAX_MESSAGE_BUF); if (n == 0) /* 写端已关闭,管道永不会再有数据 */ return 0; return (n > 0) ? n : -1; } int process_message(char *req, log_manager *logger,rbt_msg *swap) { if(req == NULL) return 0; int fd; char type[16], end[16]; if(sscanf(req, "%15s/%d/%15s", type, &fd, end) != 3) { free(req); return -1; } const char *body = recv_http_request(fd); if(rbt_parse_json(body,swap) == 0) { logs *log = malloc(sizeof(logs)); // cppcheck-suppress uninitdata snprintf(log->log, sizeof(log->log), "%s message %s processed ok\n", swap->nickname,swap->raw_message); make_swap(swap); logger->in_log(log, logger); } const char *response = "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: 2\r\n" "\r\n" "OK"; write(fd, response, strlen(response)); close(fd); free(req); return 0; } int iss_work(netm *self,char *command) { int i = self->last_alc; //查询空闲线程 while(atomic_load(&(self->pool[i].status)) ==0) { if(ipool[i].fifo_fd[0],command,strlen(command)); //设置线程程为working atomic_fetch_sub(&self->pool[i].status,1); self->last_alc = i; } void *pth_module(void *args_p) { net_args *argms = (net_args*)args_p; pth_m *pmd = argms->pth; log_manager *logger = argms->log; //参数解析 free(args_p); char name[256] = {'\0'}; sprintf(name,"chatrebot%lu",pthread_self()); int swapfd = create_swap(name); //创建共享内存 char swap_arg[64] = {'\0'}; sprintf(swap_arg,"%d",swapfd); pid_t id = fork(); if(id == 0) { char *args[]={ "Pluginmanager", "--swap",swap_arg, NULL}; execv("Run_pluhginmanager",args); } logs *pth_log = (logs*)malloc(sizeof(logs)); // cppcheck-suppress uninitdata sprintf(pth_log->log,"PID:%lu launched python plugines\n",pthread_self()); logger->in_log(pth_log,logger); rbt_msg *swap = (rbt_msg*)mmap(NULL, sizeof(rbt_msg), PROT_READ|PROT_WRITE, MAP_SHARED,swapfd, 0); //拉起python插件管理器 for(;;){ //线程池中,单个线程模型 char *req = (char*)malloc(NET_MAX_MESSAGE_BUF); //从管道中读取请求,并解析,无内容时休眠 int n = read_req(pmd->fifo_fd[0],(void*)req); //管道关闭时退出; if (n == EOF) { return NULL; break; } else{ pth_log = (logs*)malloc(sizeof(logs)); sprintf(pth_log->log,"processd message"); logger->in_log(pth_log,logger); process_message(req,logger,swap); atomic_fetch_add(&pmd->status, 1); } } } int start_pool(netm *self) { for(int i = 0;ipool[i].fifo_fd); //启动线程 net_args *arg = (net_args*)malloc(sizeof(net_args)); arg->pth = &self->pool[i]; arg->log = self->logmanager; self->pool[i].status = 1; pthread_create(&self->pool[i].pthread_id,NULL,pth_module,(void*)arg); } } int shutdown_pool(netm *self) { for(int i = 0;ipool[i].status == -1) continue; self->pool[i].status = -1; close(self->pool[i].fifo_fd[1]); } return 1; } int server_run(int port,int fifo_fd,netm *self) { int epfd = epoll_create1(EPOLL_CLOEXEC); if (epfd == -1) { perror("epoll_create1"); exit(EXIT_FAILURE); } struct epoll_event ev; //设置epoll同时监听控制管道与http请求 ev.events = EPOLLIN; ev.data.fd = fifo_fd; epoll_ctl(epfd, EPOLL_CTL_ADD, fifo_fd, &ev); char iss_buf[256]; self->http_fd = init_http_network(port,self->logmanager); ev.data.fd = self->http_fd; epoll_ctl(epfd, EPOLL_CTL_ADD, self->http_fd, &ev); struct epoll_event events[10]; self->epoll_fd = epfd; for(;;) { /*工作循环-----------------------------*/ int nf = epoll_wait(epfd,events,10,-1); printf("%d\n",nf); if (nf == -1) { perror("epoll_wait"); break; } for(int i = 0; ihttp_fd) { int nt_fd = accept4(self->http_fd,NULL,NULL,SOCK_NONBLOCK | SOCK_CLOEXEC); printf("%d\n",nt_fd); if(nt_fd == -1) continue; sprintf(iss_buf,"s/%d/e",nt_fd); self->iss_work(self,iss_buf); } if(events[i].data.fd == fifo_fd) { char command; while(read(fifo_fd,&command,1)==1) { switch(command){ case 'q': //退出逻辑 quit_server(self); return 1; break; case 'u': //插件更新逻辑 break; } } } } /*工作循环----------------------------*/ } } void *run_network(void *self_d) { netm *self = (netm*)self_d; self->start_pool(self); server_run(self->port,self->fifo_fd[0],self); self->shutdown_pool(self); } int init_networkmanager(netm *self,int *fifo,log_manager *logmanager,int port) { self->run_network = run_network; self->iss_work = iss_work; self->start_pool = start_pool; self->shutdown_pool = shutdown_pool; //装载方法 self->fifo_fd[0]= fifo[0]; self->fifo_fd[1]= fifo[1]; self->last_alc = 0; self->port = port; //初始化参数 self->logmanager = logmanager; self->err_indictor = (indiector*)malloc(sizeof(indiector)); return 0; }