Next: イントロソート(intro sort) Up: トップページ Previous: 差分 MSD radix sort

OpenMP 版 差分 MSD radix sort

差分 MSD radix sort には改良の余地があります。それは再帰処理の並列化です。 差分 MSD radix sort の再帰呼び出しで処理される区間は互いに独立していますので、 これを並列化することができます。 以下に OpenMP により並列化された差分 MSD radix sort のコードを示します。

リスト1:OpenMP版 差分 MSD radix sort(mdiff_sort_omp.cpp)
#ifndef mdiff_sort_omp_cpp
#define mdiff_sort_omp_cpp

#include <iterator>
#include <functional>
#include <new>

template <class RAI, class S>
void mdiff_sort_omp(RAI a0, RAI aN, S sub);

template <class RAI, class S>
static void mdiff_sort_omp_2nd(RAI a0, RAI aN, S sub);

template <class RAI>
inline void mdiff_sort_omp(RAI a0, RAI aN) //subを省略した時に呼び出す。
{
        typedef typename std::iterator_traits<RAI>::value_type val_t;
        mdiff_sort_omp(a0, aN, std::minus<val_t>());
}

template <class RAI, class S>
void mdiff_sort_omp(RAI a0, RAI aN, S sub) //この関数から呼び出す。
{
        typedef typename std::iterator_traits<RAI>::value_type val_t;
        typedef typename std::iterator_traits<RAI>::difference_type dif_t;
        typedef typename S::result_type key_type;
        static const int SIGS = 9;
        static const int KEYS = 1 << SIGS;
        const dif_t N = aN - a0;
        const RAI& a = a0;

        if (N < 40) {
                if (a0 == aN) return;
                RAI p = a0;
                for (++p; p != aN; ++p) {
                        const val_t tmp = *p;
                        RAI q, r; q = r = p;
                        if (sub(tmp, *a0) < 0)
                                while (a0 != q) { --r; *q = *r; q = r; }
                        else 
                                while (sub(tmp, *(--r)) < 0) { *q = *r; q = r; }
                        *q = tmp;
                }
                return;
        }
        val_t min_a, max_a;
        min_a = max_a = *a0;
        for (RAI ai = a0 + 1; ai < aN; ++ai) {
                const val_t& x = *ai;
                if (sub(x, min_a) < 0) min_a = x;
                else if (sub(max_a, x) < 0) max_a = x;
        }
        key_type d = sub(max_a, min_a);
        if (d == 0) return;
        int ld;
        for (ld = 0; d > 0; d >>= 1) ld++;
        const int shift = (ld > SIGS) ? ld - SIGS : 0;
        dif_t* const h = new dif_t[KEYS];
        val_t* b;
        try { b = new val_t[N]; }
        catch (std::bad_alloc&) { delete[] h; throw; }
        for (int k = 0; k < KEYS; k++) h[k] = 0;
        val_t* bi = b;
        for (RAI ai = a0; ai < aN; ++ai, ++bi) {
                const val_t& x = *ai;
                ++h[int(sub(x, min_a) >> shift)];
                *bi = x;
        }
        for (int k = 1; k < KEYS; k++) h[k] += h[k - 1];
        for (bi = b + N; bi > b;) {
                const val_t& x = *(--bi);
                const dif_t j = --h[int(sub(x, min_a) >> shift)];
                a[j] = x;
        }
        delete[] b;
        if (ld > SIGS) {
                #pragma omp parallel for
                for (int k = 0; k < KEYS - 1; k++)
                        mdiff_sort_omp_2nd(a0 + h[k], a0 + h[k + 1], sub);
                mdiff_sort_omp_2nd(a0 + h[KEYS - 1], aN, sub);
        }
        delete[] h;
}

template <class RAI, class S>
static void mdiff_sort_omp_2nd(RAI a0, RAI aN, S sub)
{
        typedef typename std::iterator_traits<RAI>::value_type val_t;
        typedef typename std::iterator_traits<RAI>::difference_type dif_t;
        typedef typename S::result_type key_type;
        static const int SIGS = 9;
        static const int KEYS = 1 << SIGS;
        const dif_t N = aN - a0;
        const RAI& a = a0;

        if (N < 40) {
                if (a0 == aN) return;
                RAI p = a0;
                for (++p; p != aN; ++p) {
                        const val_t tmp = *p;
                        RAI q, r; q = r = p;
                        if (sub(tmp, *a0) < 0)
                                while (a0 != q) { --r; *q = *r; q = r; }
                        else 
                                while (sub(tmp, *(--r)) < 0) { *q = *r; q = r; }
                        *q = tmp;
                }
                return;
        }
        val_t min_a, max_a;
        min_a = max_a = *a0;
        for (RAI ai = a0 + 1; ai < aN; ++ai) {
                const val_t& x = *ai;
                if (sub(x, min_a) < 0) min_a = x;
                else if (sub(max_a, x) < 0) max_a = x;
        }
        key_type d = sub(max_a, min_a);
        if (d == 0) return;
        int ld;
        for (ld = 0; d > 0; d >>= 1) ld++;
        const int shift = (ld > SIGS) ? ld - SIGS : 0;
        dif_t* const h = new dif_t[KEYS];
        val_t* b;
        try { b = new val_t[N]; }
        catch (std::bad_alloc&) { delete[] h; throw; }
        for (int k = 0; k < KEYS; k++) h[k] = 0;
        val_t* bi = b;
        for (RAI ai = a0; ai < aN; ++ai, ++bi) {
                const val_t& x = *ai;
                ++h[int(sub(x, min_a) >> shift)];
                *bi = x;
        }
        for (int k = 1; k < KEYS; k++) h[k] += h[k - 1];
        for (bi = b + N; bi > b;) {
                const val_t& x = *(--bi);
                const dif_t j = --h[int(sub(x, min_a) >> shift)];
                a[j] = x;
        }
        delete[] b;
        if (ld > SIGS) {
                for (int k = 0; k < KEYS - 1; k++)
                        mdiff_sort_omp_2nd(a0 + h[k], a0 + h[k + 1], sub);
                mdiff_sort_omp_2nd(a0 + h[KEYS - 1], aN, sub);
        }
        delete[] h;
}

#endif // mdiff_sort_omp_cpp

Next: イントロソート(intro sort) Up: トップページ Previous: 差分 MSD radix sort 01360