Linux——利用命名管道创建进程池
发布日期:2025-04-07 09:54:17 浏览次数:7 分类:精选文章

本文共 8347 字,大约阅读时间需要 27 分钟。

利用命名管道创建进程池——技术实践指南

第1步:创建管道

首先,我们需要准备一些命名管道来构建进程池。为此,我编写了一个函数CreateChannels,用于创建一批命名管道。这是一个典型的案例:

#pragma once#include 
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
const int MY_CHANNEL_NUMBER = 10;const std::string base_name = "../my_fifo";void CreateChannels(std::vector
& channels) { for (int i = 0; i < MY_CHANNEL_NUMBER; ++i) { std::stringstream ss; ss << base_name << "_" << i; std::string name = ss.str(); if (mkfifo(name.c_str(), 0666) == -1) { if (errno != EEXIST) { perror("create channels fail"); exit(EXIT_FAILURE); } else { std::cout << "channel has existed" << std::endl; channels.push_back(Channel(name)); } } else { channels.push_back(Channel(name)); } }}

进一步优化:检查错误状态

为了确保管道不存在重复,我们可以检查errno状态:

#pragma once#include 
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
const int MY_CHANNEL_NUMBER = 10;const std::string base_name = "../my_fifo";void CreateChannels(std::vector
& channels) { for (int i = 0; i < MY_CHANNEL_NUMBER; ++i) { std::stringstream ss; ss << base_name << "_" << i; std::string name = ss.str(); int fd = mkfifo(name.c_str(), 0666); if (fd == -1) { if (errno != EEXIST) { perror("create channels fail"); exit(EXIT_FAILURE); } else { std::cout << "channel has existed" << std::endl; channels.push_back(Channel(name)); } } else { channels.push_back(Channel(name)); } }}

第2步:批量删除管道

类似地,我们可以编写一个函数DeletePipes,以删除已经创建好的管道:

void DeletePipes(std::vector
& channels) { for (auto& e : channels) { if (unlink(e.fifo_path.c_str()) == -1) { if (errno == ENOENT) { std::cout << e.fifo_path << " not exists" << std::endl; } else { perror("unlink fail"); exit(EXIT_FAILURE); } } }}

封装管道管理

为了更好地管理和编写管道代码,我建议将管道封装到一个类中。这样可以提高代码的可维护性和可扩展性:

struct Channel {    Channel(const std::string& fifo_name) : fifo_path(fifo_name) {}    std::string fifo_path;};void CreateChannels(std::vector
& channels) { int retry = 0; while (retry < 10 && channels.size() < MY_CHANNEL_NUMBER) { std::vector
names; for (int i = 0; i < MY_CHANNEL_NUMBER; ++i) { std::stringstream ss; ss << base_name << "_" << i; names.push_back(ss.str()); } for (auto& name : names) { int fd = mkfifo(name.c_str(), 0666); if (fd == -1) { if (errno != EEXIST) { perror("create channels fail"); exit(EXIT_FAILURE); } else { channels.push_back(Channel(name)); } } else { channels.push_back(Channel(name)); } } retry++; std::cout << "Channels being created... " << (100 - retry * 10) << "%完成" << std::endl; sleep(1); }}

第4步:模拟进程通信

为了模拟进程间通信,我们需要用一个主进程来管理子进程的工作。这通常涉及到forkexec等系统调用:

void DoingWork(std::vector
& channels, const std::string& work_name) { auto channel = ChooseChannel(channels); pid_t pid = fork(); if (pid == 0) { if (access(channel.fifo_path.c_str(), F_OK) == -1) { std::cout << "工作流程被终止:" << channel.fifo_path << std::endl; exit(EXIT_SUCCESS); } int rfd = open(channel.fifo_path.c_str(), O_RDONLY); if (rfd < 0) { perror("无法打开读管道"); exit(EXIT_FAILURE); } char buffer[1024]; ssize_t nread = read(rfd, buffer, sizeof(buffer)); if (nread <= 0) { std::cout << "读取失败:" << channel.fifo_path << std::endl; close(rfd); exit(EXIT_FAILURE); } Work(buffer); close(rfd); } else if (pid > 0) { int wfd = open(channel.fifo_path.c_str(), O_WRONLY); if (wfd < 0) { perror("无法打开写管道"); exit(EXIT_FAILURE); } ssize_t nwritten = write(wfd, work_name.c_str(), work_name.size()); if (nwritten < 0) { std::cout << "写入失败:" << channel.fifo_path << std::endl; close(wfd); exit(EXIT_FAILURE); } close(wfd); waitpid(pid, nullptr, W_UNKennigandroid blocker); }}

功能扩展:类封装

为了更高效地管理进程池,我建议将这些功能封装到一个主程序类中,便于管理和扩展:

class MainProcess {public:    MainProcess() = default;    void createChannels() {        for (int i = 0; i < 10; ++i) {            std::stringstream ss;            ss << "../my_fifo_" << i;            if (mkfifo(ss.str().c_str(), 0666) == 0) {                channels.push_back(Channel(ss.str()));            } else {                if (errno != EEXIST) {                    bs:: cout << "创建管道失败:" << ss.str() << std::endl;                    exit(EXIT_FAILURE);                } else {                    channels.push_back(Channel(ss.str()));                }            }        }    }private:    std::vector
channels;};

模拟进程任务调度

为了验证进程池的功能,我编写了一个简单的任务调度逻辑:

std::string SeleteWork() {    std::random Device rd;    std::mt19937 gen(rd());    std::uniform_int_distribution
dis(0, 4); int task_number = dis(gen); static const std::vector
task_list = { "下载视频", "解压视频", "转换视频格式", "观看视频", "学习编程" }; return task_list[task_number];}

完整代码示例

你可能会遇到一些实际编码问题,让我帮你解决!

_SDUTIL源文件:#pragma once#include 
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
struct Channel { Channel(std::string fifo_name) : fifo_path(fifo_name) {} std::string fifo_path;};class MainProcess {public: MainProcess() = default; void createChannels() { std::vector
channels; const int ChannelNumber = 5; for(int i = 0; i < ChannelNumber; ++i) { std::string name = "../myfifo_" + std::to_string(i); if (mkfifo(name.c_str(), 0666) == 0) { channels.push_back(Channel(name)); } else { if (errno != EEXIST) { std::cerr << "failed to create fifo: " << name << std::endl; exit(EXIT_FAILURE); } else { std::cout << "fifo exists: " << name << std::endl; channels.push_back(Channel(name)); } } } channels.size() = ChannelNumber; std::cout << "已创建" << ChannelNumber << "个管道" << std::endl; } void deleteChannels() { for (auto& c : channels) { if (unlink(c.fifo_path.c_str()) == -1) { if (errno == ENOENT) { std::cout << "删除成功:" << c.fifo_path << std::endl; } else { std::cerr << "删除失败:" << c.fifo_path << std::endl; } } } std::cout << "所有管道已被删除" << std::endl; } const Channel& chooseChannel() { static std::random_device rd; static std::mt19937 gen(rd()); std::uniform_int_distribution
dis(0, channels.size() - 1); return channels[dis(gen)]; } const std::string& selectTask() { static std::random_device rd; static std::mt19937 gen(rd()); std::uniform_int_distribution
dis(0, 4); int choose = dis(gen); static const std::vector
taskList = { "下载视频", "解压视频", "转换视频格式", "观看视频", "学习编程" }; return taskList[choose]; } void doingWork() { std::string task = selectTask(); std::cout << "启动执行任务:" << task << std::endl; pid_t pid = fork(); if (pid == 0) { // 子进程读取任务并执行 std::string fifo_path = chooseChannel().fifo_path; if (access(fifo_path.c_str(), F_OK) == -1) { std::cerr << " fifo path not exists: " << fifo_path << std::endl; return EXIT_SUCCESS; } int rfd = ::open(fifo_path.c_str(), O_RDONLY); if (rfd == -1) { std::cerr << "无法打开读取端" << std::endl; return EXIT_SUCCESS; } char buffer[1024]; ssize_t bytes_read = ::read(rfd, buffer, sizeof(buffer)); if (bytes_read <= 0) { std::cerr << "读取失败:" << fifo_path << std::endl; return EXIT_SUCCESS; } buffer[bytes_read] = '\0'; Work(buffer); close(rfd); } else { // 父进程写入任务 int wfd = ::open(chooseChannel().fifo_path.c_str(), O_WRONLY); if (wfd == -1) { std::cerr << "无法打开写进入道" << std::endl; return EXIT_SUCCESS; } bytes_written = ::write(wfd, task.c_str(), task.size()); if (bytes_written < 0) { std::cerr << "写入失败:" << chooseChannel().fifo_path << std::endl; return EXIT_SUCCESS; } close(wfd); waitpid(pid, nullptr, W_UNBLOCKED); } }};

总结

通过以上步骤,我们成功创建并管理了一个基于命名管道的进程池系统。这种方法通过封装管理管道,实现了进程间的安全通信和资源共享。在实际应用中,可以根据具体需求进行调整,例如处理更复杂的任务和错误情况。此外,也可以将这些功能扩展到更多的操作系统命令和功能中。希望本文能为大家提供一个清晰的指导,帮助您成功实现进程池管理!

上一篇:Linux——动态库
下一篇:Linux——共享内存

发表评论

最新留言

很好
[***.229.124.182]2025年04月26日 04时13分02秒