MPI并行计算 - mpi4py

博客参考:

代码参考:

书籍参考:

一、MPI简介

1、MPI基础概念

MPI 的全称是 Message Passing Interface,即消息传递接口,它是一套并行运算中信息传递和处理的标准。消息传递指的是并行执行的各个进程具有自己独立的堆栈和代码段,作为互不相关的多个程序独立执行,进程之间的信息交互完全通过显式地调用通信函数来完成。消息可以理解成带有一些信息和数据的一个数据结构。

我会先解释一下 MPI 在消息传递模型设计上的一些经典概念。第一个概念是通讯器(communicator,通讯组)。通讯器定义了一组能够互相发消息的进程。在这组进程中,每个进程会被分配一个序号,称作秩(rank),进程间显性地通过指定秩来进行通信

通信的基础建立在不同进程间发送和接收操作。一个进程可以通过指定另一个进程的以及一个独一无二的消息标签(tag)来发送消息给另一个进程。接受者可以发送一个接收特定标签标记的消息的请求(或者也可以完全不管标签,接收任何消息),然后依次处理接收到的数据。类似这样的涉及一个发送者以及一个接受者的通信被称作点对点(point-to-point)通信

当然在很多情况下,某个进程可能需要跟所有其他进程通信。比如主进程想发一个广播给所有的从进程。在这种情况下,手动去写一个个进程点对点的信息传递就显得很笨拙。而且事实上这样会导致网络利用率低下。MPI 有专门的接口来帮我们处理这类所有进程间的集体性(collective)通信

更多解释参考

2、MPI与其他概念的比较(OpenMP,RPC)

这套标准有很多种实现,比如C++,Fortran的OpenMPI,MS-MPI和MPICH;以及提供了各种编程接口,比如Python的mpi4py(需要安装MS-MPI,mpi4j才可以调用静态库),Matlab-MPI等等。在这些程序中调用响应的库来实现程序的并行化

MPI的竞品包括 RPC,Distributed Shared Memory 等。关于它们的比较可以参考论文 Message Passing, Remote Procedure Calls and Distributed Shared Memory as Communication Paradigms for Distributed Systems

MP

消息传递是大多数分布式系统中内部进程间进行信息交互的基础,是最低层次的抽象,它需要应用程序员能够识别接受进程,消息,发送进程以及数据类型。主要包括两个原语:

  • send(receiver,message):在进程发送消息时,进程是阻塞的,当消息已发送出去并且消息缓冲有空闲时,他才会execute
  • receive(sender, message):在进程接收消息时,进程是阻塞的,直到接受到消息。

RPC

在消息传递中,程序员需要明确控制数据的移动,RPC远端进程调用为进程间信息交互增加了一层抽象,减轻了程序员负担。

  • call procedure_name(value_arguments; result_arguments)

  • receive procedure_name(in value_parameters; out result_parameters)

  • reply(caller, result_parameters)

call会一直block直到reply接收到数据。远端要调用的进程是远端服务器中正在执行的服务器进程,这个服务器进程会被receive()阻塞,直到该进程接收到发送进程传过来的信息和参数。然后当服务器完成好任务之后,它会发送reply()。

DSM

分布式共享内存(Distributed Shared Memory)是更高一层的抽象,DSM增加了操作系统的复杂度但却为应用开发者带来了便利。这些内存可以通过虚拟地址进行访问,因此这些进程能够在内存中直接寻址完成数据读取和修改。

虽然DSM能够简化编程,减轻程序员对MP的操作,但是MP仍然需要用到,在操作系统中必须在不同的机器之间发送消息用于共享内存的请求(而不是本地内存),而且必须保持内存中的数据和备份数据之间的一致性。

  • read(shared_variable)
  • write(data, shared_variable)

MPI 提出了这一系列为了解决进程间消息传递问题而存在的接口,但它们需要一个实现。OpenMPI 是 MPI 的常用实现之一。因此我们可以理解,MPI 是定义,是接口,而 OpenMPI 是这一接口的对应实现。这里还有一个容易混淆的概念,就是 OpenMP。OpenMP(Open Multi-Processing)与 OpenMPI,MPI 并无任何关系。它是一个针对共享内存并行编程的 API。这里特意提出,避免混淆。

并行处理框架主要有MPI、OpenMP和MapReduce(Hadoop)三个(CUDA属于GPU并行编程,这里不提及)。MPI比大多数并行框架要底层,MPI和Hadoop都可以在集群中运行,而OpenMP因为共享存储结构的关系,不能在集群上运行,只能单机。另外,MPI可以让数据保留在内存中,可以为节点间的通信和数据交互保存上下文,所以能执行迭代算法,而Hadoop却不具有这个特性。因此,需要迭代的机器学习算法大多使用MPI来实现。当然了,部分机器学习算法也是可以通过设计使用Hadoop来完成的。

相比Python自带的Multiprocessing包,MPI语法更复杂,不过好处是,它扩展性更强,后期可以扩展到多节点分布式计算。而Multiprocessing只能打单机。

二、MPI 知识点 & mpi4py使用

1、安装

Mpi4py是构建在mpi之上的python库,使得python的数据结构可以在进程(或者多个cpu)之间进行传递。

from mpi4py import MPI

此时会出现错误ImportError: DLL load failed

因为本机缺乏MPI程序提供的静态文件,直接此处下载msmpisetup.exe并安装就可以了(MS-MPISDK,exe都安装,并配置mpiexec.exe的环境变量),保证mpi4py可以调用到。

安装完成之后可以尝试跑一个小小例子来验证一下它确实是装好了。

from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
    msg = 'Hello, world'
    comm.send(msg, dest=1)
elif rank == 1:
    s = comm.recv()
    print("rank %d: %s" % (rank, s))
else:
    print("rank %d: idle" % (rank)

运行方式,命令行里执行:

mpiexec -np 8 python mpi4j_test/code001_p2p.py   #开启8个进程(开更多的进程也可以)

运行结果

rank 5: idle
rank 4: idle
rank 6: idle
rank 3: idle
rank 2: idle
rank 7: idle
rank 1: Hello, world

2、点对点通讯

https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter3/12_Point-to-point_communication.html

1)概念

整个过程分为两部分,发送者发送数据,接收者接收数据,二者必须都指定发送方/接收方。

在mpi4py中:

comm.send()comm.recv() 函数都是阻塞的函数。他们会一直阻塞调用者,知道数据使用完成。同时在MPI中,有两种方式发送和接收数据:

  • buffer模式
  • 同步模式

在buffer模式中,只要需要发送的数据被拷贝到buffer中,执行权就会交回到主程序,此时数据并非已经发送/接收完成。在同步模式中,只有函数真正的结束发送/接收任务之后才会返回。

2)代码
  • 实现2个进程之间的数据传输
  • 注意:下面的程序必须至少使用2个进程运行,否则会出现异常(需要进程之间通信)
# 需要将这段代码保存成文件才能实现多进程程序的运行
import mpi4py.MPI as MPI
comm = MPI.COMM_WORLD  #通过命令行传入的参数np,调用MS-MPI获得一个通讯组,该通讯组定义了一组互相发消息的进程
comm_rank = comm.Get_rank()  #为每一个进程分配一个rank
comm_size = comm.Get_size()  #这组进程中共有comm_size个进程

print(f"comm_rank = {comm_rank}")   #会被执行comm_size次,因为该程序在MPI中会被调用多次,完成进程间的通信
print(f"comm_size = {comm_size}")

data_send = [comm_rank] * 4
comm.send( data_send, dest=(comm_rank + 1) % comm_size)   #进程i会给进程 (i+1) % size发消息 (循环队列)
# 如果comm_rank-1<0,会自动加comm_size变为正数
data_recv = comm.recv( source=(comm_rank - 1) % comm_size)  #进程i会接收进程(i - 1) % size发过来的消息(循环队列)
print( "my rank is %d, I received :" % comm_rank )
print( data_recv )

命令行里执行:

mpiexec -np 10 python mpi4j_test/code001_p2p1.py

控制台输出结果为:

comm_rank = 8
comm_size = 10
my rank is 8, I received :
[7, 7, 7, 7]
comm_rank = 7
comm_size = 10
my rank is 7, I received :
[6, 6, 6, 6]
comm_rank = 6
comm_size = 10
my rank is 6, I received :
[5, 5, 5, 5]
comm_rank = 3
comm_size = 10
my rank is 3, I received :
[2, 2, 2, 2]
comm_rank = 4
comm_size = 10
my rank is 4, I received :
[3, 3, 3, 3]
comm_rank = 5
comm_size = 10
my rank is 5, I received :
[4, 4, 4, 4]
comm_rank = 9
comm_size = 10
my rank is 9, I received :
[8, 8, 8, 8]
comm_rank = 0
comm_size = 10
my rank is 0, I received :
[9, 9, 9, 9]
comm_rank = 1
comm_size = 10
my rank is 1, I received :
[0, 0, 0, 0]
comm_rank = 2
comm_size = 10
my rank is 2, I received :
[1, 1, 1, 1]

集体通讯

集合通信允许在一个通信组内的多个进程之间同时传递数据,集合通信的语法和语意与点到点通信是一致的,不过集合通信只有阻塞版本。

经常用到的集合通信操作有以下这些:

  • 障同步(Barrier synchronization)操作;
  • 全局通信,包括广播(Broadcast),收集(Gather)和发散(Scatter);
  • 全局规约,包括求和,最大,最小等。

MPI 组通信和点到点通信的一个重要区别就是,在某个进程组内所有的进程同时参加通信,mpi4py 提供了方便的接口让我们完成 Python 中的组内集合通信,方便编程同时提高程序的可读性和可移植性。

1、广播通讯

https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter3/14_Collective_communication_using_broadcast.html

1)概念

在并行代码的开发中,我们会经常发现需要在多个进程间共享某个变量运行时的值,或操作多个进程提供的变量(可能具有不同的值)。

为了解决这个问题,使用了通讯数。举例说,如果进程0要发送信息给进程1和进程2,同时也会发送信息给进程3,4,5,6,即使这些进程并不需要这些信息。

另外,MPI库提供了在多个进程之间交换信息的方法,针对执行的机器做了优化。

将所有进程变成通讯者的这种方法叫做集体交流。因此,一个集体交流通常是2个以上的进程。我们也可以叫它广播——一个进程发送消息给其他的进程。mpi4py 模块通过以下的方式提供广播的功能:

buf = comm.bcast(data_to_share, rank_of_root_process)

这个函数将root消息中包含的信息发送给属于 comm 通讯组其他的进程,每个进程必须通过相同的 rootcomm 来调用它。

集体通讯允许组中的多个进程同时进行数据交流。在 mpi4py 模块中,只提供了阻塞版本的集体通讯(阻塞调用者,直到缓存中的数据全部安全发送。)

广泛应用的集体通讯应该是:

  1. 组中的进程提供通讯的屏障
  2. 通讯方式包括:
    • 将一个进程的数据广播到组中其他进程中
    • 从其他进程收集数据发给一个进程
    • 从一个进程散播数据散播到其他进程中
  3. 减少操作
2)代码
  • 将一个数据发送给所有的进程,每个进程都会得到这所有的数据
import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD   #通过命令行传入的参数np,调用MS-MPI获得一个通讯组,该通讯组定义了一组互相发消息的进程
comm_rank = comm.Get_rank()   #为每一个进程分配一个rank
comm_size = comm.Get_size()  #这组进程中共有comm_size个进程

print(f"comm_rank = {comm_rank}")   #会被执行comm_size次,因为该程序在MPI中会被调用多次,完成进程间的通信
print(f"comm_size = {comm_size}")

if comm_rank == 0:
    data = [i for i in range(comm_size)]  #root进程初始化data_variable

data = comm.bcast( data if comm_rank == 0 else None, root=0 )  #为了发送消息,需要声明了一个广播
print( "rank %d, got : " % comm_rank )
print( data )  #接收进程通过data获得root节点广播的数据

命令行里执行

mpiexec -np 5 python mpi4j_test/code002_bcast.py

控制台输出

comm_rank = 0
comm_size = 5
rank 0, got :
[0, 1, 2, 3, 4]
comm_rank = 2
comm_size = 5
rank 2, got :
[0, 1, 2, 3, 4]
comm_rank = 3
comm_size = 5
rank 3, got :
[0, 1, 2, 3, 4]
comm_rank = 1
comm_size = 5
rank 1, got :
[0, 1, 2, 3, 4]
comm_rank = 4
comm_size = 5
rank 4, got :
[0, 1, 2, 3, 4]

在上面的代码中,root进程建立了一个列表,然后广播给所有的进程,因此所有的进程都会拥有这个列表(数据),从而实现了数据共享。
上面有一个问题,即这种方法进行数据广播的时间复杂度为O(N),如果要实现O(loh(N))的方法,需要使用规约树广播。具体解释的链接:https://blog.csdn.net/zouxy09/article/details/49031845

2、散播通讯

https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter3/15_Collective_communication_using_scatter.html

1)概念

scatter函数和广播很像,但是有一个很大的不同, comm.bcast 将相同的数据发送给所有在监听的进程, comm.scatter 可以将数据放在数组中,发送给不同的进程。下图展示了scatter的功能:

comm.scatter 函数接收一个array,根据进程的rank将其中的元素发送给不同的进程。比如第一个元素将发送给进程0,第二个元素将发送给进程1,等等。 mpi4py 中的函数原型如下:

recvbuf  = comm.scatter(sendbuf, rank_of_root_process)
2)代码
  • 将一份数据平分给所有的进程,比如说有10个数据,给2个进程,则每个进程可以得到5个数据
  • 注意:root也会得到自己散播出去的数据并进行处理
  • mpi4py可以无缝使用numpy,很方便
import mpi4py.MPI as MPI
import numpy as np

comm = MPI.COMM_WORLD   #通过命令行传入的参数np,调用MS-MPI获得一个通讯组,该通讯组定义了一组互相发消息的进程
comm_rank = comm.Get_rank()   #为每一个进程分配一个rank
comm_size = comm.Get_size()  #这组进程中共有comm_size个进程

if comm_rank == 0:
    # 一定要确保data的长度是np的数量
    data = np.random.rand(comm_size,3)
    # data = [i for i in range(comm_size)]
    # data = [[1], [2], [3], [4]]
    print( "all data by rank %d : " % comm_rank )
    print( data )
else:
    data = None

local_data = comm.scatter( data , root=0 )
print( "rank %d, got : " % comm_rank )
print( local_data )  #接收进程通过local_data获得root节点散播的数据

命令行输入

mpiexec -np 5 python mpi4j_test/code003_scatter.py

控制台输出

all data by rank 0 :
[[0.19222594 0.68314071 0.05664603]
 [0.49260859 0.51062194 0.8958247 ]
 [0.38953222 0.48038652 0.69827788]
 [0.67110165 0.88019167 0.65287105]
 [0.05903742 0.80521226 0.05810528]]
rank 0, got :
[0.19222594 0.68314071 0.05664603]
rank 2, got :
[0.38953222 0.48038652 0.69827788]
rank 4, got :
[0.05903742 0.80521226 0.05810528]
rank 1, got :
[0.49260859 0.51062194 0.8958247]
rank 3, got :
[0.67110165 0.88019167 0.65287105]

3、收集数据

https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter3/16_Collective_communication_using_gather.html

1)概念

gather 函数基本上是反向的 scatter ,即收集所有进程发送向root进程的数据mpi4py 实现的 gather 函数如下:

recvbuf = comm.gather(sendbuf, rank_of_root_process)

这里, sendbuf 是要发送的数据, rank_of_root_process 代表要接收数据进程。

2)代码
  • 在求一个数组的最大值的时候,可以让一个进程进行处理,也可以让多个进程同时处理,给每个进程一组特定的数据,求完之后再最最大值回收,可以减小root的数据处理压力,同时减小时间复杂度。
import mpi4py.MPI as MPI
import numpy as np

comm = MPI.COMM_WORLD   #通过命令行传入的参数np,调用MS-MPI获得一个通讯组,该通讯组定义了一组互相发消息的进程
comm_rank = comm.Get_rank()   #为每一个进程分配一个rank
comm_size = comm.Get_size()  #这组进程中共有comm_size个进程

if comm_rank == 0:
    data = np.random.rand(comm_size, 2)
    print( data )
else:
    data = None

local_data = comm.scatter( data, root=0 )  # root将raw数据散播到每个进程中,等待进程返回处理结果
if(comm_rank % 2 == 0):  #偶数进程将local_data置为-local_data
    local_data = -local_data # 对数据进行处理
else:  #奇数进程将local_data置为0
    local_data = 0  # 对数据进行处理
print( "rank %d got data and finished dealing." % comm_rank )  
print( local_data )  #接收进程通过local_data获得root节点散播的数据

# 由root=0进行数据的收集
# 因为需要进行收集工作,所以是最后执行完的。
combine_data = comm.gather( local_data, root = 0 )  # 数据处理完毕,该进程将结果发送给root
if comm_rank == 0:
    print(f"rank {comm_rank} gather data : {combine_data}" )

命令行输入

mpiexec -np 5 python mpi4j_test/code004_gather.py

控制台输出

rank 1 got data and finished dealing.
[0.37386323 1.58787098]
rank 4 got data and finished dealing.
[-0.42959221 -0.70688952]
rank 3 got data and finished dealing.
[1.24694741 1.95314574]
rank 2 got data and finished dealing.
[-0.15093183 -0.55118371]

[[0.97245788 0.8760742 ]
 [0.18693162 0.79393549]
 [0.15093183 0.55118371]
 [0.62347371 0.97657287]
 [0.42959221 0.70688952]]
rank 0 got data and finished dealing.
[-0.97245788 -0.8760742 ]
rank 0 gather data : [array([-0.97245788, -0.8760742 ]), array([0.37386323, 1.58787098]), array([-0.15093183, -0.55118371]), array([1.24694741, 1.95314574]), array([
-0.42959221, -0.70688952])]

4、规约(Reduce)

https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter3/18_The_reduction_operation.html

1)概念

comm.gather 一样, comm.reduce 接收一个数组,每一个元素是一个进程的输入,然后返回一个数组,每一个元素是进程的输出,返回给 root 进程。输出的元素中包含了简化(规约)的结果。

mpi4py 中,我们将简化操作定义如下:

comm.Reduce(sendbuf, recvbuf, rank_of_root_process, op = type_of_reduction_operation)

这里需要注意的是,这里有个参数 opcomm.gather 不同,它代表你想应用在数据上的操作, mpi4py 模块代表定义了一系列的简化操作,其中一些如下:

  • MPI.MAX : 返回最大的元素
  • MPI.MIN : 返回最小的元素
  • MPI.SUM : 对所有元素相加
  • MPI.PROD : 对所有元素相乘
  • MPI.LAND : 对所有元素进行逻辑操作
  • MPI.MAXLOC : 返回最大值,以及拥有它的进程
  • MPI.MINLOC : 返回最小值,以及拥有它的进程

为了演示相加简化操作,我们使用 comm.Reduce 语句,并将含有 recvbuf 的 rank 设置为 0, recvdata 代表了最后的计算结果:

comm.Reduce(senddata, recvdata, root=0, op=MPI.SUM)

我们的 op = MPI.SUM 选项,将在所有的列上面应用求和操作。下图表示了这个步骤:

操作的过程如下:

  • 进程 P0 发出数据数组 [0 1 2]
  • 进程 P1 发出数据数组 [0 2 4]
  • 进程 P2 发出数据数组 [0 3 6]

规约操作将每个 task 的第 i 个元素相加,然后放回到 P0 进程的第 i 个元素中。在接收操作中, P0 收到数据 [0 6 12]。

2)代码
  • 在求一个数组的最大值的时候,可以让一个进程进行处理,也可以让多个进程同时处理,给每个进程一组特定的数据,求完之后再最最大值回收,可以减小root的数据处理压力,同时减小时间复杂度。
import mpi4py.MPI as MPI
import numpy as np

comm = MPI.COMM_WORLD   #通过命令行传入的参数np,调用MS-MPI获得一个通讯组,该通讯组定义了一组互相发消息的进程
comm_rank = comm.Get_rank()   #为每一个进程分配一个rank
comm_size = comm.Get_size()  #这组进程中共有comm_size个进程

if comm_rank == 0:
    data = np.random.randint(low = 0, high=10, size = (comm_size, 3))
    print( data )
else:
    data = None

local_data = comm.scatter( data, root=0 )  # root将raw数据散播到每个进程中,等待进程返回处理结果
print(f"rank {comm_rank} recieve data : {local_data}")  #接收进程通过local_data获得root节点散播的数据

# 和gather相同,由root=0进行数据的规约
all_sum = comm.reduce( local_data, op=MPI.SUM, root = 0)  # 数据处理完毕,该进程将结果发送给root
if comm_rank == 0:
    print(f"rank {comm_rank} reduce data : {all_sum}" )

命令行输入

mpiexec -np 5 python mpi4j_test/code005_reduce.py

控制台输出

rank 4 recieve data : [0 7 7]
rank 3 recieve data : [9 8 2]
rank 2 recieve data : [1 5 9]
rank 1 recieve data : [1 3 3]

[[1 3 3]
 [1 3 3]
 [1 5 9]
 [9 8 2]
 [0 7 7]]
rank 0 recieve data : [1 3 3]
rank 0 reduce data : [12 26 24]
3)自定义算子
  • MPI定义的op有SUM,MIN,MAX等函数,但是如果我们希望能够自定义处理函数,则可以自己实现。
  • op的输入参数是2个类型相同的变量,返回一个参数,一个简单的定义如下:my_func
import mpi4py.MPI as MPI
import numpy as np

# 自定义op
def my_func( a, b ):
    f = a*a + b*b
    return f

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

if comm_rank == 0:
    data = np.random.rand(comm_size, 1)
    print( data )
else:
    data = None

local_data = comm.scatter( data, root=0 )
local_data = -local_data # 对数据进行处理
print( "rank %d got data and finished dealing." % comm_rank )
print( local_data )

all_sum = comm.reduce( local_data, root=0, op=my_func )
if comm_rank == 0:
    print( "sum is : %f", all_sum )

其他知识点

1、死锁

1)概念

我们经常需要面临的一个问题是死锁,这是两个或多个进程点对点)都在阻塞等待对方释放自己想要的资源的情况。 mpi4py 没有提供特定的功能来解决这种情况,但是提供了一些程序员必须遵守的规则,来避免死锁问题。

from mpi4py import MPI
comm=MPI.COMM_WORLD
rank = comm.rank    #通过命令行传入的参数np,调用MS-MPI获得一个通讯组,该通讯组定义了一组互相发消息的进程
print("my rank is : " , rank)  #为每一个进程分配一个rank


'''必定会产生死锁'''
if rank==1:
    data_send= "a"
    destination_process = 5
    source_process = 5
    data_received=comm.recv(source=source_process)  #进程1阻塞,等待接收进程5发送的资源
    comm.send(data_send,dest=destination_process)   #进程1接收到5发送的资源后,向进程5发送资源
    print("sending data %s " %data_send + "to process %d" %destination_process)
    print("data received is = %s" %data_received)

if rank==5:
    data_send= "b"
    destination_process = 1
    source_process = 1
    data_received=comm.recv(source=source_process)  #进程5阻塞,等待接收进程1发送的资源
    comm.send(data_send,dest=destination_process)  #进程5接收到1发送的资源后,向进程1发送资源
    print("sending data %s :" %data_send + "to process %d" %destination_process)
    print("data received is = %s" %data_received)

命令行输入:

mpiexec -np 6 python mpi4j_test/code006_deadLock.py

控制台输出:

my rank is :  3
my rank is :  4
my rank is :  0
my rank is :  2
...  #死锁
mpiexec aborting job...

job aborted:
[ranks] message

[0] job terminated by the user

[1-5] terminated

---- error analysis -----

[0] on DESKTOP-5R02C26
ctrl-c was hit. job aborted by the user.
2)解决思路

方法1:调整两进程的rec,send的执行顺序,但是这种方法虽然在逻辑上可以避免死锁,但仍然存在(暂时)死锁的可能。主要原因是 comm.send() 函数将要发送的数据完全拷贝到buffer里,只有buffer里有完整的数据之后程序才能继续运行,否则,依然会产生死锁。

if rank==1:
    data_send= "a"
    destination_process = 5
    source_process = 5
    comm.send(data_send,dest=destination_process)   #进程1先向进程5发送资源
    data_received = comm.recv(source=source_process)  # 进程1阻塞,等待接收进程5发送的资源
    print("sending data %s " %data_send + "to process %d" %destination_process)
    print("data received is = %s" %data_received)

if rank==5:
    data_send= "b"
    destination_process = 1
    source_process = 1
    data_received=comm.recv(source=source_process)  #进程5阻塞,等待接收进程1发送的资源
    comm.send(data_send,dest=destination_process)  #进程5接收到1发送的资源后,向进程1发送资源
    print("sending data %s :" %data_send + "to process %d" %destination_process)
    print("data received is = %s" %data_received)

控制台输出:

my rank is :  4
my rank is :  3
my rank is :  0
my rank is :  2
my rank is :  5
sending data b :to process 1
data received is = a
my rank is :  1
sending data a to process 5
data received is = b

方法2:在mpi4py中,有一个特定的函数统一了向一特定进程发消息和从一特定进程接收消息的功能,叫做 Sendrecv

Sendrecv(self, sendbuf, int dest=0, int sendtag=0, recvbuf=None, int source=0, int recvtag=0, Status status=None)

可以看到,这个函数的参数同 comm.send() MPI 以及 comm.recv() MPI 相同。同时在这个函数里,整个函数都是阻塞的(里面有什么策略就不清楚了),相比于交给子系统来负责检查发送者和接收者之间的依赖,可以避免死锁问题。用这个方案改写之前的例子如下:

'''使用sendrecv函数'''
if rank==1:
    data_send= "a"
    destination_process = 5
    source_process = 5
    data_received=comm.sendrecv(data_send,dest=destination_process,source =source_process)  #该函数函数统一了向一特定进程发消息和从一特定进程接收消息的功能
    print(f"rank = {rank}, data received is = {data_received}")
if rank==5:
    data_send= "b"
    destination_process = 1
    source_process = 1
    data_received=comm.sendrecv(data_send,dest=destination_process, source=source_process)
    print("data received is = %s" % data_received)
    print(f"rank = {rank}, data received is = {data_received}")

控制台输出

my rank is :  4
my rank is :  0
my rank is :  2
my rank is :  3
my rank is :  5
data received is = a
rank = 5, data received is = a
my rank is :  1
rank = 1, data received is = b

2、通讯优化(★★★)

1)概念

拓扑是 MPI 提供的一个有趣功能。如前所述,所有通信功能(点对点或集体)都是指一组进程。我们一直使用包含所有进程的 MPI_COMM_WORLD 组。它为大小为n的通信组的每个进程分配 0 - n-1 的一个rank。但是,MPI允许我们为通信器分配虚拟拓扑。它为不同的进程定义了特定的标签分配。这种机制可以提高执行性能。实际上,如果构建虚拟拓扑,那么每个节点将只与其虚拟邻居进行通信,从而优化性能

例如,如果rank是随机分配的,则消息可能会在到达目的地之前被迫传递给许多其他节点。除了性能问题之外,虚拟拓扑还可确保代码更清晰可读。 MPI提供两种建筑拓扑。第一个构造创建笛卡尔拓扑,而后者可以创建任何类型的拓扑。具体来说,在第二种情况下,我们必须提供要构建的图形的邻接矩阵。我们将只处理笛卡尔拓扑,通过它可以构建多种广泛使用的结构:网格,环形,环形等等。用于创建笛卡尔拓扑的函数如下所示:

MPI提供两种建筑拓扑。第一个构造创建笛卡尔拓扑,而后者可以创建任何类型的拓扑。具体来说,在第二种情况下,我们必须提供要构建的图形的邻接矩阵。我们将只处理笛卡尔拓扑,通过它可以构建多种广泛使用的结构:网格,环形等等。用于创建笛卡尔拓扑的函数如下所示:

comm.Create_cart((number_of_rows,number_of_columns))

这里, number_of_rowsnumber_of_columns 指定了栅格的行列数。

2)代码

参考

在下面的例子中,我们将展示如何实现一个 M x N 的笛卡尔拓扑。

from mpi4py import MPI
import numpy as np
UP = 0
DOWN = 1
LEFT = 2
RIGHT = 3
neighbour_processes = [0,0,0,0]  #上,下,左,右

# 利用笛卡尔坐标系绘制进程间的拓扑结构M x N,可以绘制2D和3D
# 参考 https://learn2codewithmesite.wordpress.com/2017/10/16/creating-topologies-in-mpi4py/, https://wgropp.cs.illinois.edu/courses/cs598-s15/lectures/lecture28.pdf
if __name__ == "__main__":
    comm = MPI.COMM_WORLD  #通过命令行,获取通讯组
    rank = comm.rank  #获取当前进程的rank
    size = comm.size   #获取通讯组中进程的个数

    print(f"comm_rank = {rank}")  # 会被执行comm_size次,因为该程序在MPI中会被调用多次,完成进程间的通信
    print(f"comm_size = {size}")

    grid_rows = int(np.floor(np.sqrt(comm.size)))
    grid_column = comm.size // grid_rows
    # 最后的拓扑是 2x2 的网状结构,大小为4,和进程数一样:
    if grid_rows * grid_column > size:
        grid_column -= 1
    if grid_rows * grid_column > size:
        grid_rows -= 1
    if (rank == 0) :
        print("Building a %d x %d grid topology:" % (grid_rows, grid_column))

    # cartesian_communicator = comm.Create_cart( (grid_rows, grid_column), periods=(True, True), reorder=True)  #为通讯器建立笛卡尔虚拟拓扑
    cartesian_communicator = comm.Create_cart( (grid_rows, grid_column),periods=(False, False), reorder=True)  #为通讯器建立笛卡尔虚拟拓扑, Return type: Cacomm

    my_mpi_row, my_mpi_col = cartesian_communicator.Get_coords( cartesian_communicator.rank )  # 通过 Get_coords() 方法,可以确定一个进程横纵坐标

    #‘Shift’ allows us to know the rank of processes that are neighbours of a given process in a particular direction
    neighbour_processes[UP], neighbour_processes[DOWN] = cartesian_communicator.Shift(direction=0, disp=1)  # direction = 0表上下
    neighbour_processes[LEFT], neighbour_processes[RIGHT] =  cartesian_communicator.Shift(direction=1, disp=1)  #direction = 1表左右

    print ("Process = %s row = %s column = %s ----> neighbour_processes[UP] = %s neighbour_processes[DOWN] = %s neighbour_processes[LEFT] =%s neighbour_processes[RIGHT]=%s" % (
    rank, my_mpi_row, my_mpi_col,neighbour_processes[UP],
    neighbour_processes[DOWN], neighbour_processes[LEFT],
    neighbour_processes[RIGHT]))
a)结构1

命令行输入1(2 x 4的网络拓扑结构):

mpiexec -n 8 python mpi4j_test/code007_topology.py

控制台输出1:

comm_rank = 0
comm_size = 8
Building a 2 x 4 grid topology:
Process = 0 row = 0 column = 0 ----> neighbour_processes[UP] = -1 neighbour_processes[DOWN] = 4 neighbour_processes[LEFT] =-1 neighbour_processes[RIGHT]=1
comm_rank = 1
comm_size = 8
Process = 1 row = 0 column = 1 ----> neighbour_processes[UP] = -1 neighbour_processes[DOWN] = 5 neighbour_processes[LEFT] =0 neighbour_processes[RIGHT]=2
comm_rank = 3
comm_size = 8
Process = 3 row = 0 column = 3 ----> neighbour_processes[UP] = -1 neighbour_processes[DOWN] = 7 neighbour_processes[LEFT] =2 neighbour_processes[RIGHT]=-1
comm_rank = 2
comm_size = 8
Process = 2 row = 0 column = 2 ----> neighbour_processes[UP] = -1 neighbour_processes[DOWN] = 6 neighbour_processes[LEFT] =1 neighbour_processes[RIGHT]=3
comm_rank = 4
comm_size = 8
Process = 4 row = 1 column = 0 ----> neighbour_processes[UP] = 0 neighbour_processes[DOWN] = -1 neighbour_processes[LEFT] =-1 neighbour_processes[RIGHT]=5
comm_rank = 6
comm_size = 8
Process = 6 row = 1 column = 2 ----> neighbour_processes[UP] = 2 neighbour_processes[DOWN] = -1 neighbour_processes[LEFT] =5 neighbour_processes[RIGHT]=7
comm_rank = 5
comm_size = 8
Process = 5 row = 1 column = 1 ----> neighbour_processes[UP] = 1 neighbour_processes[DOWN] = -1 neighbour_processes[LEFT] =4 neighbour_processes[RIGHT]=6
comm_rank = 7
comm_size = 8
Process = 7 row = 1 column = 3 ----> neighbour_processes[UP] = 3 neighbour_processes[DOWN] = -1 neighbour_processes[LEFT] =6 neighbour_processes[RIGHT]=-1
b)结构2

命令行输入2(2 x 2的网络拓扑结构):

mpiexec -n 4 python mpi4j_test/code007_topology.py

控制台输出结果如下:

comm_rank = 0
comm_size = 4
Building a 2 x 2 grid topology:
Process = 0 row = 0 column = 0 ----> neighbour_processes[UP] = -1 neighbour_processes[DOWN] = 2 neighbour_processes[LEFT] =-1 neighbour_processes[RIGHT]=1
comm_rank = 2
comm_size = 4
Process = 2 row = 1 column = 0 ----> neighbour_processes[UP] = 0 neighbour_processes[DOWN] = -1 neighbour_processes[LEFT] =-1 neighbour_processes[RIGHT]=3
comm_rank = 3
comm_size = 4
Process = 3 row = 1 column = 1 ----> neighbour_processes[UP] = 1 neighbour_processes[DOWN] = -1 neighbour_processes[LEFT] =2 neighbour_processes[RIGHT]=-1
comm_rank = 1
comm_size = 4
Process = 1 row = 0 column = 1 ----> neighbour_processes[UP] = -1 neighbour_processes[DOWN] = 3 neighbour_processes[LEFT] =0 neighbour_processes[RIGHT]=-1

2 x 2的网络拓扑结构如下图所示:

在这里插入图片描述

对于每一个进程,输出结果都是:如果 neighbour_processes = -1 ,那么没有临近的拓扑,否则, neighbour_processes 显示最近的进程。

c)结构3

命令行输入:

mpiexec -n 6 python mpi4j_test/code007_topology.py

控制台输出:

comm_rank = 0
comm_size = 6
Building a 2 x 3 grid topology:
Process = 0 row = 0 column = 0 ----> neighbour_processes[UP] = -1 neighbour_processes[DOWN] = 3 neighbour_processes[LEFT] =-1 neighbour_processes[RIGHT]=1
comm_rank = 1
comm_size = 6
Process = 1 row = 0 column = 1 ----> neighbour_processes[UP] = -1 neighbour_processes[DOWN] = 4 neighbour_processes[LEFT] =0 neighbour_processes[RIGHT]=2
comm_rank = 2
comm_size = 6
Process = 2 row = 0 column = 2 ----> neighbour_processes[UP] = -1 neighbour_processes[DOWN] = 5 neighbour_processes[LEFT] =1 neighbour_processes[RIGHT]=-1
comm_rank = 3
comm_size = 6
Process = 3 row = 1 column = 0 ----> neighbour_processes[UP] = 0 neighbour_processes[DOWN] = -1 neighbour_processes[LEFT] =-1 neighbour_processes[RIGHT]=4
comm_rank = 4
comm_size = 6
Process = 4 row = 1 column = 1 ----> neighbour_processes[UP] = 1 neighbour_processes[DOWN] = -1 neighbour_processes[LEFT] =3 neighbour_processes[RIGHT]=5
comm_rank = 5
comm_size = 6
Process = 5 row = 1 column = 2 ----> neighbour_processes[UP] = 2 neighbour_processes[DOWN] = -1 neighbour_processes[LEFT] =4 neighbour_processes[RIGHT]=-1
c)结构4

如果把笛卡尔拓扑函数修改为

cartesian_communicator = comm.Create_cart( (grid_rows, grid_column), periods=(True, True), reorder=True)  #为通讯器建立笛卡尔虚拟拓扑

命令行输入2(2 x 2的网络拓扑结构):

mpiexec -n 4 python mpi4j_test/code007_topology.py

控制台输出2:

comm_rank = 2
comm_size = 4
Process = 2 row = 1 column = 0 ----> neighbour_processes[UP] = 0 neighbour_processes[DOWN] = 0 neighbour_processes[LEFT] =3 neighbour_processes[RIGHT]=3
comm_rank = 3
comm_size = 4
Process = 3 row = 1 column = 1 ----> neighbour_processes[UP] = 1 neighbour_processes[DOWN] = 1 neighbour_processes[LEFT] =2 neighbour_processes[RIGHT]=2
comm_rank = 0
comm_size = 4
Building a 2 x 2 grid topology:
Process = 0 row = 0 column = 0 ----> neighbour_processes[UP] = 2 neighbour_processes[DOWN] = 2 neighbour_processes[LEFT] =1 neighbour_processes[RIGHT]=1
comm_rank = 1
comm_size = 4
Process = 1 row = 0 column = 1 ----> neighbour_processes[UP] = 3 neighbour_processes[DOWN] = 3 neighbour_processes[LEFT] =0 neighbour_processes[RIGHT]=0

2 x 2的网络拓扑结构如下图所示:

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐