MPI培训文档¶
约 2162 个字 430 行代码 4 张图片 预计阅读时间 13 分钟
MPI入门¶
参考链接:
- Tutorials · MPI Tutorial
- Open MPI 入门笔记 | JinBridge
- Coding Games and Programming Challenges to Code Better
- MPI之聚合通信-Scatter,Gather,Allgather
学习工具:
-
MPI概述
- 什么是MPI?
MPI(Message Passing Interface,消息传递接口)为在分布式内存架构下的进程间通信提供了规范和库支持。在程序的角度,MPI就是一系列函数接口,他们可以实现不同进程(不同内存区域)之间的消息传递
- 适用场景:分布式内存并行模型
-
MPI编程模型
-
分布式内存模型
在分布式内存模型中,各个处理节点可以独立运行自己的进程,使用自己的本地内存来存储和处理数据。每个进程的内存是私有的,其他进程无法直接访问它们。如果一个进程需要访问另一个进程的数据,就必须通过显式的消息传递机制将数据从一个进程发送到另一个进程。同一个节点(服务器)内部需要借助高速数据总线等硬件实现,而跨节点的通信通常由网络连接来实现,比如通过高速以太网、IB(InfiniBand)等。
-
MPI的核心概念
- 进程: 在MPI中,每个计算任务由一个或多个进程执行。进程是独立的计算实体,有自己的内存空间。MPI程序通常启动多个进程,这些进程在分布式内存系统中运行。
- 通信: MPI通过消息传递的方式进行进程间通信。主要有两种通信方式:
- 点对点通信(Point-to-Point Communication): 两个进程之间直接传递消息。例如,进程A发送数据给进程B。
- 集体通信(Collective Communication): 多个进程之间进行数据传递或同步操作。例如,广播(broadcast)、归约(reduce)等操作。
- 通信协议: MPI提供了多种通信协议,如阻塞通信(Blocking)、非阻塞通信(Non-blocking)、同步通信(Synchronous)等。
-
-
安装与运行MPI
好用的工具
建议通过文章开头提到的学习工具,实践一下MPI的安装和使用。
-
安装 标准开发环境已经安装好了
openmpi
,你也可以通过下述命令安装:sudo apt-get install openmpi-bin libopenmpi-dev
使用
mpicc -v
确认安装完成 -
普通程序/MPI程序的编译命令
语言 C C++ Fortran 77 Fortran 90 编译器 gcc g++ gfortran gfortran MPI编译器 mpicc mpic++/mpicxx/mpiCC mpif77 mpif90 -
编译并运行MPI程序
mpicc mpi_test.c -o mpi_test #编译 mpirun -np 4 ./mpi_test #运行(-np用于指定处理器数量)
-
MPI基础函数接口¶
-
初始化与终止
-
MPI_Init
:初始化MPI环境,必须在任何MPI调用之前调用。MPI_Init(&argc, &argv);
在 MPI_Init 的过程中,所有 MPI 的全局变量或者内部变量都会被创建。一个通讯子(communicator)会根据所有可用的进程被创建出来,然后每个进程会被分配独一无二的秩(rank)
-
MPI_Finalize
:结束MPI环境,释放MPI使用的资源。MPI_Finalize();
-
-
获取进程信息
-
MPI_Comm_size
:获取通信子(communicator)中进程的总数。int world_size; MPI_Comm_size(MPI_COMM_WORLD, &world_size);
-
MPI_Comm_rank
:获取当前进程在通信子中的编号(从0开始)。int world_rank; MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
-
-
点对点通信
MPI_Send
:发送消息到指定的进程。MPI_Recv
:接收来自指定进程的消息。
//接口细节 int MPI_Send(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm); /* buf: 发送数据的起始地址。 count: 要发送的数据元素个数。 datatype: 数据类型(如MPI_INT、MPI_FLOAT)。 dest: 目标进程的编号。 tag: 消息标识,用于匹配接收消息。 comm: 通信子 */ //示例 int number = 42; MPI_Send(&number, 1, MPI_INT, 1, 0, MPI_COMM_WORLD);
//接口细节 int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status); /* buf: 接收数据的起始地址。 count: 可接收的数据元素个数。 datatype: 数据类型(如MPI_INT、MPI_FLOAT)。 source: 源进程的编号。 tag: 消息标识,用于匹配接收消息。 comm: 通信子。 status: 返回状态信息的结构体指针(可以传递MPI_STATUS_IGNORE忽略)。 */ //示例 int number; MPI_Recv(&number, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
-
集合通信
-
MPI_Bcast
:将一条消息从一个进程广播到通信子中的所有进程。 -
MPI_Scatter
:将根进程的数据分散(scatter)到所有进程中。每个进程接收根进程提供的数据的一部分。 -
MPI_Reduce
:对来自所有进程的数据进行归约操作(如求和、求最小值),并将结果发送到根进程。 -
MPI_Gather
:将各进程的数据收集到根进程中。 -
MPI_Allgather
:将所有进程的部分数据汇总到所有进程。每个进程在所有进程中接收到所有其他进程的数据。
//接口细节 int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm) /* buffer: 广播数据的起始地址。 count: 数据元素个数。 datatype: 数据类型(如MPI_INT、MPI_FLOAT)。 root: 广播的源进程编号。 comm: 通信子。 */ //示例 int data = 100; MPI_Bcast(&data, 1, MPI_INT, 0, MPI_COMM_WORLD);
//接口细节 int MPI_Scatter(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm); /* sendbuf: 发送数据的起始地址。 send_count:具体需要给每个进程发送的数据的个数 sendtype:数据类型。 recvbuf: 接收结果的地址(仅根进程使用)。 recvcount: 接受数据元素个数。 recvtype: 数据类型。 root: 接收结果的根进程编号。 comm: 通信子。 */
//接口细节 int MPI_Reduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm); /* sendbuf: 发送数据的起始地址。 recvbuf: 接收结果的地址(仅根进程使用)。 count: 数据元素个数。 datatype: 数据类型。 op: 归约操作(如MPI_SUM、MPI_MIN)。 root: 接收结果的根进程编号。 comm: 通信子。 */ //示例 int sum; int local_data = 5; MPI_Reduce(&local_data, &sum, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
//接口细节 int MPI_Gather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm); /* sendbuf: 发送数据的起始地址。 sendcount: 每个进程发送的数据元素个数。 sendtype: 发送数据的数据类型。 recvbuf: 接收数据的地址(仅根进程使用)。 recvcount: 每个进程发送的数据元素个数(接收方)。 recvtype: 接收数据的数据类型。 root: 接收结果的根进程编号。 comm: 通信子。 */ //示例 int local_data = 5; int gathered_data[4]; MPI_Gather(&local_data, 1, MPI_INT, gathered_data, 1, MPI_INT, 0, MPI_COMM_WORLD);
//接口细节 int MPI_Allgather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm); /* sendbuf: 每个进程要发送的数据的缓冲区指针。 sendcount: 每个进程发送的数据元素个数。 sendtype: 发送数据的类型。 recvbuf: 每个进程接收数据的缓冲区的指针。 recvcount: 每个进程接收的数据元素个数。 recvtype: 接收数据的类型。 comm: 通信子。 */
-
MPI中的同步与异步通信
-
阻塞通信 vs. 非阻塞通信
- 阻塞通信是指在通信操作完成之前,调用该通信函数的进程会被阻塞(即等待)。这意味着在通信操作完成之前,进程无法继续执行后续的操作。这种通信方法实现较为简单,但是可能会导致进程等待,特别是在进行大量通信操作时影响性能。适用于简单的通信场景。
- 非阻塞通信允许进程在发送或接收数据的同时继续执行其他计算任务。通信操作的完成会在稍后的时间自动进行,可以与计算任务重叠,提高性能,但是编程复杂度较高,需要显式检查通信完成状态。
选学部分
阻塞通信:
-
MPI_Send
-
MPI_Recv
非阻塞通信:
-
MPI_Isend
非阻塞地发送数据。 -
MPI_Irecv
非阻塞地接收数据。 -
MPI_Wait
用于确保非阻塞操作完成之后再继续执行后续代码。
MPI中的数据类型与通讯域/通信子/通讯器:
-
MPI_Datatype
:MPI提供了内置和自定义数据类型,用于定义数据的格式和结构。自定义数据类型允许更复杂的数据组织和传输。 -
MPI_Comm
:MPI中的通信域定义了进程的集合,这些进程可以在同一通信域内进行数据交换。MPI提供了默认通信域和创建自定义通信域的功能,以支持不同的并行计算模式和需求。
MPI实验¶
-
尝试写一个MPI的Hello world程序,并输出每个进程的ID。
代码提示
#include <mpi.h> #include <stdio.h> int main(int argc, char** argv) { //==== TODO:初始化MPI环境 ==== int world_size; // 总进程数 int world_rank; // 当前进程的排名(ID) char processor_name[MPI_MAX_PROCESSOR_NAME]; // 处理器名称 int name_len; // 名称长度 //==== TODO:获取总进程数 ==== //==== TODO:获取当前进程的排名 ==== // 获取处理器名称 MPI_Get_processor_name(processor_name, &name_len); printf("Hello world from processor %s, rank %d out of %d processors\n", processor_name, world_rank, world_size); //==== TODO:清理MPI环境 ==== return 0; }
-
尝试从进程0发送一个数据(如数字42)到进程1,并在进程1接收时输出该内容。
代码提示
#include <mpi.h> #include <stdio.h> #include <stdlib.h> int main(int argc, char** argv) { //==== TODO:初始化MPI环境 ==== //==== TODO:获取总进程数与当前进程排名 ==== if (world_size < 2) { if (world_rank == 0) { printf("需要至少2个进程\n"); } //==== TODO:清理MPI环境 ==== return 1; } //==== TODO:如果是进程0,则向进程1发送一个消息(如数字42) ==== //==== TODO:如果是进程1,则从进程0接受一个消息,并输出收到的内容 ==== //==== TODO:清理MPI环境 ==== return 0; }
-
尝试从进程0广播一段数据,并让每个进程输出接收到的值。
代码提示
#include <mpi.h> #include <stdio.h> #include <stdlib.h> int main(int argc, char** argv) { //==== TODO:初始化MPI环境 ==== //==== TODO:获取总进程数与当前进程排名 ==== int* data = NULL; //实际数据 int data_size; //数据大小 if (world_rank == 0) { // 主进程初始化数据 data_size = 5; data = (int*)malloc(data_size * sizeof(int)); for (int i = 0; i < data_size; i++) data[i] = i + 1; printf("进程 0 广播数据: "); for (int i = 0; i < data_size; i++) printf("%d ", data[i]); printf("\n"); } //==== TODO:广播数据大小 ==== // 分配缓冲区 if (world_rank != 0) { data = (int*)malloc(data_size * sizeof(int)); } //==== TODO:广播实际数据 ==== //==== TODO:所有进程打印自己的rank和接收到的数据 ==== free(data); //==== TODO:清理MPI环境 ==== return 0; }
-
尝试编写一个计算向量点积的程序,并使用MPI进行并行,在“代码提示”中已给出串行计算的代码。提示:使用MPI_Scatter和MPI_Reduce
代码提示
#include <mpi.h> #include <stdio.h> #include <stdlib.h> #include <math.h> #include <time.h> int main(int argc, char** argv) { MPI_Init(&argc, &argv); int rank, size; MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); const int vector_size = 1000000; const int local_size = vector_size / size; float *full_A = NULL; float *full_B = NULL; if (rank == 0) { full_A = (float*)malloc(vector_size * sizeof(float)); full_B = (float*)malloc(vector_size * sizeof(float)); // 使用固定种子初始化完整向量(保证可重复性) srand(12345); for (int i = 0; i < vector_size; i++) { full_A[i] = (float)rand() / RAND_MAX; full_B[i] = (float)rand() / RAND_MAX; } } float *local_A = (float*)malloc(local_size * sizeof(float)); float *local_B = (float*)malloc(local_size * sizeof(float)); //============ TODO:并行计算 ============ //... //... //... //============串行计算============ if (rank == 0) { double serial_dot = 0.0; for (int i = 0; i < vector_size; i++) { serial_dot += (double)full_A[i] * (double)full_B[i]; } double abs_error = fabs(global_dot - serial_dot); printf("并行点积: %.16f\n", global_dot); printf("串行点积: %.16f\n", serial_dot); printf("绝对误差: %.6e\n", abs_error); free(full_A); free(full_B); } free(local_A); free(local_B); MPI_Finalize(); return 0; }
-
蒙特卡洛算法估算\(\pi\),请使用MPI进行并行优化,已在“代码提示”中给出串行版本的实现。
代码提示
#include <stdio.h> #include <stdlib.h> #include <math.h> #include <time.h> int main() { const long total_samples = 100000000; long count_in_circle = 0; srand(time(NULL)); for (long i = 0; i < total_samples; i++) { double x = (double)rand() / RAND_MAX * 2.0 - 1.0; double y = (double)rand() / RAND_MAX * 2.0 - 1.0; if (x*x + y*y <= 1.0) { count_in_circle++; } } double pi_estimate = 4.0 * (double)count_in_circle / total_samples; printf("估算π = %.10f\n", pi_estimate); printf("实际π = %.10f\n", M_PI); printf("样本数 = %ld\n", total_samples); return 0; }
mpi4py¶
参考链接
mpi4py是一个Python库,提供了MPI(消息传递接口)的绑定,使得Python程序能够进行并行计算。它支持多种并行模式,包括点对点通信、集合通信(如广播、分散和聚集)以及使用NumPy数组的快速缓冲区通信。
尽管python中自带有multiprocessing库实现多进程,但是其只能在单机上实现并行。如果需要使用多机的并行仍需使用mpi4py。
-
安装mpi4py:
pip install mpi4py
-
使用方法:
from mpi4py import MPI
常用函数¶
Info
由于mpi4py的函数与上文MPI的内容重复度较高,故不再赘述。
这里仅给出部分常用函数,若有其他需求,可通过参考链接自行查阅。
-
基础内容
# 通信器 MPI.COMM_WORLD # 包含所有进程 # 进程标识: rank = comm.Get_rank() #当前进程ID(0到size-1) size = comm.Get_size() #总进程数
-
点对点通信
# 阻塞通信 comm.Send(data, dest_rank, tag=0) # 发送 data = comm.Recv(source_rank, tag=0) # 接收 # 非阻塞通信 req = comm.Isend(data, dest_rank) # 异步发送 req = comm.Irecv(source_rank) # 异步接收 req.wait() # 等待完成
-
集体通信
# Bcast data = comm.Bcast(data, root=0) # Scatter local_data = comm.Scatter(data_list, root=0) # Reduce & Allreduce sum_all = comm.Reduce(local_val, op=MPI.SUM, root=0) sum_global = comm.Allreduce(local_val, op=MPI.SUM) # Gather & Allgather(数据大小必须相同) gathered = comm.Gather(sendbuf, recvbuf, root=0) all_data = comm.Allgather(sendbuf, recvbuf) # Gatherv & Allgatherv (可处理不同大小的数据) gathered_v = Gatherv(sendbuf, recvbuf, root=0) all_vector = Allgatherv(sendbuf, recvbuf)
mpi4py实验¶
运行实验
使用如下命令:
mpiexec -np 4 python {YOUR_CODE_NAME}.py
请将"{YOUR_CODE_NAME}"替换为你的文件名称
-
Hello,mpi4py.
- 编写程序运行后打印自己的rank与当前的size并退出。
没有提示
并没有提示,我觉得你能自己做到的!
-
mpi4py库中的点对点通信(ping - pong)
- 编写程序让程序开始运行时0号进程发送'ping'信息给1号进程,1号进程发送'pong'信息给0号进程
- 1号进程每次收到0号进程消息后打印输出并等待1s后返回'pong'信息给0号进程
- 0号进程每次收到1号进程消息后打印输出并等待1s后返回'ping'信息给0号进程
代码提示
from mpi4py import MPI import time import numpy as np def main(): comm = MPI.COMM_WORLD # ==== TODO:获取rank和size ==== # 确保至少有2个进程 if size < 2: if rank == 0: print("Error: Need at least 2 processes") return if rank == 0: # ==== TODO:进程0:发送ping,接收pong ==== time.sleep(1) elif rank == 1: # ==== TODO:进程1:发送pong,接收ping ==== time.sleep(1) if __name__ == "__main__": main()
-
mpi4py库中的集合通信
- 编写函数mpi_np_gather(send_buffer: np.ndarray)-> np.ndarray。
- 实现功能:send_buffer为将发送二维的np.ndarray对象,所有进程发送的np.ndarray对象列数相同,行数可以不同。返回值为各个程序的发送的np.ndarray对象按照rank的顺序拼接而成的np.ndarray数组。
-
示例:
# 假设size=4 if rank == 0: mpi_np_gather(np.array([[0,1]])) elif rank == 1: mpi_np_gather(np.array([[1,0],[2,3]])) elif rank == 2: mpi_np_gather(7*np.ones((3,2))) elif rank == 3: print(mpi_np_gather(np.array[[114, 514]]) ''' 输出结果: [[0 1] [1 0] [2 3] [7 7] [7 7] [7 7] [114 514]] '''
代码提示
from mpi4py import MPI import numpy as np def mpi_np_gather(send_buffer: np.ndarray) -> np.ndarray: comm = MPI.COMM_WORLD # ==== TODO:获取rank和size ==== # ==== TODO:想办法计算最终接收的总行数 ==== # 计算接收缓冲区中的位移 displacements = [0] for i in range(1, size): displacements.append(displacements[i-1] + recv_counts[i-1]) # ==== TODO:创建接收缓冲区(一维数组)==== # ==== TODO:用Allgatherv或gatherv接收数据 ==== # ==== TODO:返回正确的值 ==== if __name__ == "__main__": comm = MPI.COMM_WORLD rank = comm.Get_rank() if rank == 0: data = np.array([[0, 1]]) elif rank == 1: data = np.array([[1, 0], [2, 3]]) elif rank == 2: data = 7 * np.ones((3, 2), dtype=int) elif rank == 3: data = np.array([[114, 514]]) result = mpi_np_gather(data) if rank == 3: print("Gathered result:") print(result)