跳转至

向量化

约 1084 个字 232 行代码 预计阅读时间 7 分钟

向量化简介

向量化是实现 SIMD 的一种方法,它通过使用特殊的 CPU 指令集来处理数据向量。这些指令集能够一次性对多个数据元素执行相同的操作,从而提高数据处理的效率。

SIMD

在现代计算中,SIMD(单指令多数据)是一种并行计算技术,它允许单个指令同时对多个数据执行操作。这种技术通过在单个指令周期内处理多个数据来提高性能,特别适合于图像处理、音频处理、科学计算等领域,其中有许多可以并行处理的数据。

常见的 SIMD 指令集

  • AVX-512:支持 512 位宽的 SIMD 操作,常见于 Intel 与 AMD 服务器级处理器中。
  • ARM NEON:ARM 架构的 SIMD 指令集,支持 128 位宽的操作。

向量化使用方法

向量化的主要使用方法有两种

  • 通过内联汇编写入代码中
  • 让编译器自动优化

内联汇编

下面是一个使用 AVX-512 指令集的例子,我们将使用 C++ 和内联汇编来演示如何向量化一个简单的数组加法操作。

#include <immintrin.h>
#include <iostream>

void avx512_add(float* a, float* b, float* result, int n) {
    // 处理数组的前n个元素
    for (int i = 0; i < n; i += 16) {
        // 加载数据到ZMM寄存器
        __m512 va = _mm512_loadu_ps(a + i);
        __m512 vb = _mm512_loadu_ps(b + i);
        // 执行AVX-512加法
        __m512 vresult = _mm512_add_ps(va, vb);
        // 将结果存回内存
        _mm512_storeu_ps(result + i, vresult);
    }
}

int main() {
    const int n = 1024; 
    float a[n], b[n], result[n];

    for (int i = 0; i < n; i++) {
        a[i] = i;
        b[i] = i;
    }

    avx512_add(a, b, result, n);

    for (int i = 0; i < n; i++) {
        std::cout << "Result[" << i << "] = " << result[i] << std::endl;
    }

    return 0;
}

编译器优化

如果你尝试运行了上述的代码并和普通版本的比较,你可能会发现两者速度并没有较大区别。这是因为现代编译器往往集成了自动向量化的能力,能够自动检测可以向量化的部分并在编译时完成向量化。

但是编译器只能完成在简单的场景下的自动向量化,如果代码略微复杂,则不能自动向量化。此时我们可以通过修改代码引导编译器完成优化。

下面的例子改编自第一届 PKU HPCGame,该例子很好的向我们展示了如何引导编译器自动向量化优化代码:

base.cpp 中实现了对输入的 double 类型的数组中的每个数进行 logistic 映射迭代 itn 轮的功能,opt.cpp 是优化过后的版本,conf.data 为对应的可执行程序所需的输入文件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <iostream>
#include <chrono>
#include <random>
#include <fstream>

double it(double r, double x, int64_t itn) {
    for (int64_t i = 0; i < itn; i++) {
        x = r * x * (1.0 - x);
    }
    return x;
}

void itv(double r, double* x, int64_t n, int64_t itn) {
    for (int64_t i = 0; i < n; i++) {
        x[i] = it(r, x[i], itn);
    }
}

int main()
{
    int64_t itn;
    double r;
    int64_t n;
    double *x;

    std::ifstream fi("conf.data");
    fi >> itn >> r >> n;
    fi.close();

    x = (double *)malloc(n * 8);
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_real_distribution<> dis(0.0, 4.0);
    for(int i=0; i<n ;i++){
        x[i] = dis(gen);
    }

    auto t1 = std::chrono::steady_clock::now();
    itv(r, x, n, itn);
    auto t2 = std::chrono::steady_clock::now();
    int d1 = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
    printf("%d\n", d1);


    return 0;
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <iostream>
#include <chrono>
#include <random>
#include <fstream>

template <int n>
void it(double r, double *x, int64_t itn)
{
    for (int64_t i = 0; i < itn; i++)
    {
        for (int64_t j = 0; j < n; j++)
        {
            x[j] = r * x[j] * (1.0 - x[j]);
        }
    }
}

template <int gn>
void itvg(double r, double *x, int64_t n, int64_t itn)
{
    for (int64_t i = 0; i < n; i += gn)
    {
        it<gn>(r, x + i, itn);
    }
}

int main()
{
    int64_t itn;
    double r;
    int64_t n;
    double *x;

    std::ifstream fi("conf.data");
    fi >> itn >> r >> n;
    fi.close();

    x = (double *)malloc(n * 8);
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_real_distribution<> dis(0.0, 4.0);
    for(int i=0; i<n ;i++){
        x[i] = dis(gen);
    }

    auto t1 = std::chrono::steady_clock::now();
    itvg<128>(r, x, n, itn);
    auto t2 = std::chrono::steady_clock::now();
    int d1 = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
    printf("%d\n", d1);


    return 0;
}
CXX = clang++
CXXFLAGS = -O3

all: base opt
clean : 
    rm  base opt
base: base.cpp
    $(CXX) $(CXXFLAGS) base.cpp -o base
opt: opt.cpp
    $(CXX) $(CXXFLAGS) opt.cpp -o opt

.PHONY: all clean
32768
0.1896
32768

base.cpp 通过两步优化即可得到 opt.cpp

  1. 数据并行化

    • base.cpp 中,函数对数组中的每个数依次进行迭代。对于单个数来说,迭代的过程显然是必须严格串行的,这样的代码让编译器无法完成自动的向量化。
    • 注意到同一个数组中各个数的迭代是可以并行处理的,且其所在内存连续,每轮迭代对应的操作相同,这非常契合向量化指令集。
    • 从而我们可以交换循环次序,让编译器进行自动向量化。
  2. 访存优化

    • 在交换循环次序后,我们每轮迭代先将整个数组完成更新,再迭代下一轮。在数组较大的情况下,显然寄存器与高速缓存无法存放所有的内存数据,这将导致频繁的从内存中读写数据。
    • 为了降低访存开销,我们可以将整个数组分块。每块的大小即为 opt.cpp 中的 gn,对于每个 gn 大小的小块,我们采用数据并行化的方法进行计算,每算完一块再计算下一块,最大程度降低了从内存中读写文件的开销。其中 gn 的最佳大小与具体的硬件有关,我们采用模版参数方法定义,测试多组找到最佳的参数即可。

向量化实验

关于实验

本模块的实验均基于 C++ 与 AVX-512,如果需要使用提示/参考答案,请确认你的处理器支持 AVX512。

你也可以选择尝试自行改编答案来使其正常运行。

  1. 在“代码提示”内给出了一段矩阵加法的向量化程序,请你对其进行 4 倍循环展开。

    代码提示
    #include <immintrin.h>
    
    // 基础向量化实现
    void matrix_add_basic(const float* A, const float* B, float* C, int M, int N) {
        int total = M * N;
    
        for (int i = 0; i < total; i += 16) {
            // 加载16个元素
            __m512 a = _mm512_loadu_ps(&A[i]);
            __m512 b = _mm512_loadu_ps(&B[i]);
    
            // 向量加法
            __m512 c = _mm512_add_ps(a, b);
    
            // 存储结果
            _mm512_storeu_ps(&C[i], c);
        }
    }
    
  2. 计算数组 arr 的 Sigmoid 函数:1/(1+exp(-x))

    提示

    使用 _mm512_expm1_ps 来计算 exp(-x)

  3. 使用向量化实现一个 16*16 的矩阵乘法,基础版本已在“代码提示”给出。

    代码提示
    #include <iostream>
    #include <cmath>
    #include <immintrin.h>
    
    const int N = 16;
    
    // 基础版本
    void matrix_mul_basic(float A[N][N], float B[N][N], float C[N][N]) {
        for (int i = 0; i < N; i++) {
            for (int j = 0; j < N; j++) {
                float sum = 0.0f;
                for (int k = 0; k < N; k++) {
                    sum += A[i][k] * B[k][j];
                }
                C[i][j] = sum;
            }
        }
    }
    
    // ======== TODO:AVX512优化版本 ========
    void matrix_mul_avx512(float A[N][N], float B[N][N], float C[N][N]) {
        //...
        //...
        //...
        //...
    }
    
    // 验证结果一致性
    bool verify_results(float C1[N][N], float C2[N][N], float epsilon = 1e-5f) {
        for (int i = 0; i < N; i++) {
            for (int j = 0; j < N; j++) {
                if (std::fabs(C1[i][j] - C2[i][j]) > epsilon) {
                    std::cerr << "Mismatch at (" << i << "," << j << "): "
                            << C1[i][j] << " vs " << C2[i][j] << std::endl;
                    return false;
                }
            }
        }
        return true;
    }
    
    int main() {
        float A[N][N], B[N][N];
        float C_basic[N][N], C_avx[N][N];
    
        // 初始化矩阵
        for (int i = 0; i < N; i++) {
            for (int j = 0; j < N; j++) {
                A[i][j] = static_cast<float>(i * N + j);
                B[i][j] = static_cast<float>(i - j);
            }
        }
    
        matrix_mul_basic(A, B, C_basic);
    
        matrix_mul_avx512(A, B, C_avx);
    
        // 验证结果
        if (verify_results(C_basic, C_avx)) {
            std::cout << "Verification PASSED" << std::endl;
        } else {
            std::cout << "Verification FAILED" << std::endl;
        }
    
        return 0;
    }