sssp_enactor.cuh

#

The enactor defines how a graph primitive runs. It calls traversal (advance and filter operators) and computation (functors).

template<
    typename KernelPolicy,     // `type of kernelpolicy, includes defination of VertexId, SizeT and Value
    int NUM_VERTEX_ASSOCIATES, // number of vertex associative of type VertexId transmitted with remote sub-frontiers 
    int NUM_VALUE_ASSOCIATES>  // number of vertex associative of type Value transmitted with remote sub-frontiers
__global__ void Expand_Incoming_Kernel(
#

Kernel to combine received and local data

             int                     thread_num,            // thread number
    typename KernelPolicy::VertexId  label,                 // label, equal to iteration number here
    typename KernelPolicy::SizeT     num_elements,          // size of recevied sub-frontier
    typename KernelPolicy::VertexId *d_keys_in,             // received sub-frontier
    typename KernelPolicy::VertexId *d_vertex_associate_in, // received associatives of VertexId type
    typename KernelPolicy::Value    *d_value__associate_in, // received associatives of Value type
    typename KernelPolicy::SizeT    *d_out_length,          // output frontier size counter, may not start with 0
    typename KernelPolicy::VertexId *d_keys_out,            // output frontier
    typename KernelPolicy::VertexId *d_preds,               // the local per-vertex predecessor marker
    typename KernelPolicy::Value    *d_distances,           // the local per-vertex distances marker
    typename KernelPolicy::VertexId *d_labels)              // the local pre-vertex label marker
{
#

Below is algorithmicly equal to gunrock::app::sssp::Expan_Incoming_Kernel, but without optimizations.

    SizeT x = (SizeT)blockIdx.x * blockDim.x + threadIdx.x;
    const SizeT STRIDE = (SizeT)blockDim.x * gridDim.x;
    while (x < num_elements) // for each element in the received sub-frontier
    {
        VertexId key          = d_keys_in[x];                           // received vertex Id
        Value    distance     = d_value__associate_in[x];               // received distance
        Value    old_distance = atomicMin(d_distances + key, distance); // compared with local distance
        if (old_value > value && d_labels[key] != label)
        { // only when local distance got updated, and vertex not in local frontier
            d_labels[key] = label;
            if (NUM_VERTEX_ASSOCIATES == 1 && d_distances[key] == distance)
#

if need to mark predecessors, and the curent distance is still not changed yet

                d_preds[key] = d_vertex_associate_in[x];  // assign the received predecessor
            d_keys_out[atomicAdd(d_out_length, 1)] = key; // put vertex in output frontier
        }
        
        x += STRIDE; // presistant thread loop
    }
}

template<
    typename AdvanceKernelPolicy, // Kernel policy for advance
    typename FilterKernelPolicy,  // Kernel policy for filter
    typename Enactor>             // type of enactor
struct SSSPIteration : public IterationBase < // Based on IterationBase
#

Main iteration loop for SSSP primitive

    AdvanceKernelPolicy, FilterKernelPolicy, Enactor,
    false,  //HAS_SUBQ = false, don't have SubQ_Core
    true,   //HAS_FULLQ = true, has FullQ_Core
    false,  //BACKWARD = false, communication is not backward
    true,   //FORWARD = true, communication is forward
    Enactor::Problem::MARK_PATHS> // MARK_PREDECESSORS, whether to mark predecessors
{
#

which Functor the iteration uses

    typedef SSSPFunctor<VertexId, SizeT, Value, Problem>
                                        Functor   ;

    template <
        int NUM_VERTEX_ASSOCIATES, // number of vertex associative of type VertexId transmitted with remote sub-frontiers 
        int NUM_VALUE__ASSOCIATES>  // number of vertex associative of type Value transmitted with remote sub-frontiers 
    static void Expand_Incoming(...)
    {
#

Check whether allocated memory space for the output array is sufficient first set out_length[peer_] to be 0, when not using unified received

        Expand_Incoming_Kernel
            <AdvanceKernelPolicy, NUM_VERTEX_ASSOCIATES, NUM_VALUE__ASSOCIATES>
            <<<...>>>
            (gpu_idx,            // thread_num
            iteration,           // iteration_num
            num_elements,        // received sub-frontier size
            keys_in,             // received sub-frontier
            vertex_associate_in, // received associatives of VertexId type
            value__associate_in, // received associatives of Value type
            out_length,          // output frontier size
            keys_out,            // output frontier
            preds,               // local predecessor marker array
            distances,           // local distance array
            labels);             // local label array
        out_length.Move(DEVICE, HOST, ...);
    }

    static cudaError_t Compute_OutputLength(...)
#

Compute the resulted frontier size, for memory (re)allocation purpose

    {
        cudaError_t retval = cudaSuccess;
        if (!size_check // no need to check whether memory allocation is sufficient
            && !hasPreScan<AdvanceKernelPolicy::ADVANCE_MODE>()) // advance does not require pre-scan
        {
            frontier_attribute -> output_length[0] = 0; // has no memory requirement
            return retval;
        } else {
#

check whether partitioned_scanned_edges array is sufficient

            if (retval = Size_Check<SizeT, SizeT>(
                size_check, "scanned_edges",
                frontir_atrribute -> queue_length + 2,
                partitioned_scanned_edges, ...))
                return retval;
#

perform a scan on the out-degrees of current frontier, use the result for memory size check and possible reallocation used for load balancing also

            retval = gunrock::oprtr::advance::ComputeOutputLength
                <...>(frontier_attribute, d_in_key_queue, partitioned_scanned_edges, ...);
#

get the computed output size onto CPU

            frontier_attribute -> output_length.Move(DEVICE, HOST, 1, 0, stream);
            return retval;
        }
    }

    static void Check_Queue_Size(...)
#

Ensure output frontier allocation are sufficient

    {
        if (!size_check // no need to check whether memory allocation is sufficient
           && !hasPreScan<AdvanceKernelPolicy::ADVANCE_MODE>()) // advance does not require pre-scan
        {
            frontier_attribute -> output_length[0] = 0;
            return retval; // no need to garentee sufficient memory
        } else if (!gunrock::oprtr::advance::isFused<AdvanceKernelPolicy::ADVANCE_MODE>()) // do not use kernel fusion
        {
#

make sure sufficient allocation for frontier between advance and filter

            if (retval = CheckSize<SizeT, VertexId>(
                ...,request_length, frontier_queue -> keys[selector^1], ...) 
                return;
#

make sure sufficient allocation for frontier after filter

            if (retval = CheckSize<SizeT, VertexId>(
                ...,graph_slice -> nodes * 1.2, frontier_queue -> keys[selector], ...) 
                return;
        } else { // when use kernel fusion
#

make sure sufficient allocation for frontier after filter

            if (retval = CheckSize<SizeT, VertexId>(
                ...,graph_slice -> nodes * 1.2, frontier_queue -> keys[selector^1], ...) 
                return;
        }
    }

    static void FullQueue_Core(...)
#

The main per-iteration computation step

    {
        frontier_attribute -> queue_reset = true; // use frotnier_attribute -> queue_length as input frontier size
#

Edge Map Traverse out going edges of vertices in the input frontier, and for every such edge, evaluate Functor::CondEdge(...), and if true, execute Functor::ApplyEdge(...)

        gunrock::oprtr::advance::LaunchKernel
            <..., Functor, ...>(
            frontier_attribute, // frontier attributes
            data_slice,         // problem associative data
            frontier_queue -> keys[frontier_attribute -> selector  ], // input frontier
            frontier_queue -> keys[frontier_attribute -> selector^1], // output frontier
            graph_slice, // row_offsets, column_indices, etc.
            ...);

        if (!gunrock::oprtr::advance::isFused<AdvanceKernelPolicy::ADVANCE_MODE>()) // if kernel fusion is not in effect
        {
            frontier_attribute -> queue_reset = false; // use outputed frontier length from advance, as input frontier length to filter
            frotnier_attribute -> queue_index ++;
            frontier_attribute -> selector ^= 1;       // aternate ping-pong queue
#

Vertex Map for every vertex in the output frontier from advance, evaluate Functor::CondFilter(...), and if true, execute Funtor::ApplyFilter(...) and put such vertex in output frontier

            gunrock::oprtr::filter::LaunchKernel
                <..., Functor, ...>(
                frontier_attribute, // frontier_attributes,
                data_slice,         // problem associative data,
                frontier_queue -> keys[frontier_attribute -> selector  ], // input frontier
                frontier_queue -> keys[frontier_attribute -> selector^1], // output frontier
                graph_slice, // num_nodes, etc.
                ...); 
        }

        frotnier_attribute -> queue_index ++;
        frontier_attribute -> selector ^= 1;       // aternate ping-pong queue
        if (retval = work_progress -> GetQueueLength(
            frontier_attribute -> queue_index,
            frontier_attribute -> queue_length,
            ...)) return; // get the output queue length on CPU
    }

};

template<
    typename AdvanceKernelPolicy, // Kernel policy for advance
    typename FilterKernelPolicy,  // Kernel policy for filter
    typename Enactor>             // type of enactor
static CUT_THREADPROC SSSPThread(
#

the per-GPU controlling thread on CPU

    void *thread_data_)           // data package to bypass the thread boundary
{
#

... thread perparation

    
    if (retval = SetDevice(gpu_idx))
    { // if set device failed, quit
        thread_data -> status = ThreadSlice::Status::Ended;
        CUT_THREADEND;
    }

    thread_data -> status = ThreadSlice::Status::Idle; // ready, and waiting to enact
    while (thread_data -> status != ThreadSlice::Status::ToKill) // loop till instruct to kill
    {
        while (thread_data -> status == ThreadSlice::Status::Wait ||
               thread_data -> status == ThreadSlice::Status::Idle)
        {
            sleep(0); // wait until status changes
        }
        if (thread_data -> status == ThreadSlice::Status::ToKill)
            break; // end if instructed

        for (int peer_=0;peer_<num_gpus;peer_++)
        { // setup frontiers
            frontier_attribute[peer_].queue_index  = 0; // Work queue index
            frontier_attribute[peer_].queue_length = peer_==0?thread_data->init_size:0;
            frontier_attribute[peer_].selector     = 0;
            frontier_attribute[peer_].queue_reset  = true;
            enactor_stats     [peer_].iteration    = 0;
        }
#

Perform one SSSP iteration loop

        gunrock::app::Iteration_Loop
            <Enactor, Functor, SSSPIteration<...>, ...>
            (thread_data);

        thread_data -> status = ThreadSlice::Status::Idle; // signal work done
    }
    CUT_THREADEND;
}

template <typename Problem> // type of Problem
#
class SSSPEnactor : public EnactorBase<typename Problem::SizeT> 
{
    ThreadSlice *thread_slices; // thread data for CPU control threads
    CUTThread   *thread_Ids;    // thread Id for CPU control threads

public:
#

constructor of the Enactor

    SSSPEnactor(...)
    ...
    {}
#

destructor of the Eanctor

    virtual ~SSSPEnactor() 
    {
        Release(); // release allocated memory
    }
#

routine to release allocated memory

    cudaError_t Release()
    {
        cudaError_t retval = cudaSuccess;
        if (thread_slices != NULL)
        {
            for (int gpu = 0; gpu < this->num_gpus; gpu++)
                thread_slices[gpu].status = ThreadSlice::Status::ToKill;
            cutWaitForThreads(thread_Ids, this->num_gpus); // Kill all GPU controling threads
            delete[] thread_Ids   ; thread_Ids    = NULL; 
            delete[] thread_slices; thread_slices = NULL; // deallocate thread data
        }
        if (retval = BaseEnactor::Release()) return retval; // release memory allocated by parent class
        problem = NULL;
        return retval;
    }

    template<
        typename AdvanceKernelPolicy, // kernel policy for advance operator
        typename FilterKernelPolicy>  // kernel policy for filter operator
    cudaError_t InitSSSP(             // initialize SSSP Enactor
        ContextPtr  *context,         // mGPU ContextPtr
        Problem     *problem,         // problem data
        int         max_grid_size = 0)
    {
        cudaError_t retval = cudaSuccess;
#

Initialize BaseEnactor

        if (retval = BaseEnactor::Init(
            max_grid_size,
            AdvanceKernelPolicy::CTA_OCCUPANCY,
            FilterKernelPolicy::CTA_OCCUPANCY))
            return retval;

        this->problem = problem;
        thread_slices = new ThreadSlice [this->num_gpus];
        thread_Ids    = new CUTThread   [this->num_gpus]; // GPU controlling thread data

        for (int gpu=0;gpu<this->num_gpus;gpu++)
        {
#

assign thread data

            thread_slices[gpu].thread_num    = gpu;
            thread_slices[gpu].problem       = (void*)problem;
            thread_slices[gpu].enactor       = (void*)this;
            thread_slices[gpu].context       = &(context[gpu*this->num_gpus]);
            thread_slices[gpu].status        = ThreadSlice::Status::Inited;
#

start the thread

            thread_slices[gpu].thread_Id     = cutStartThread(
                    (CUT_THREADROUTINE)&(SSSPThread<
                        AdvanceKernelPolicy, FilterKernelPolicy,
                        SSSPEnactor<Problem> >),
                    (void*)&(thread_slices[gpu]));
            thread_Ids[gpu] = thread_slices[gpu].thread_Id;
        }
#

wait till all controlling threads ready

        for (int gpu=0; gpu < this->num_gpus; gpu++)
        {
            while (thread_slices[gpu].status != ThreadSlice::Status::Idle)
            {
                sleep(0);
            }
        }
        return retval;
    }
#

Routine to reset the Enactor

    cudaError_t Reset()
    {
        cudaError_t retval = cudaSuccess;
#

Reset the BaseEnactor

        if (retval = BaseEnactor::Reset())
            return retval;
#

Signal every controlling thread to wait

        for (int gpu=0; gpu < this -> num_gpus; gpu++)
            thread_slices[gpu].status = ThreadSlice::Status::Wait;
        return retval;
    }

    template<
        typename AdvanceKernelPolicy,
        typename FilterKernelPolicy>
    cudaError_t EnactSSSP(
        VertexId src)    // the source vertex
    {
        cudaError_t  retval     = cudaSuccess;
#

prepare initial frontier size for each GPU

        for (int gpu=0;gpu<this->num_gpus;gpu++)
        {
            if ((this->num_gpus ==1) || (gpu==this->problem->partition_tables[0][src]))
                 thread_slices[gpu].init_size=1;
            else thread_slices[gpu].init_size=0;
            this->frontier_attribute[gpu*this->num_gpus].queue_length
                = thread_slices[gpu].init_size;
        }
#

singnal controlling threads to process

        for (int gpu=0; gpu< this->num_gpus; gpu++)
        {
            thread_slices[gpu].status = ThreadSlice::Status::Running;
        }
#

wait until all controling threads finish

        for (int gpu=0; gpu< this->num_gpus; gpu++)
        {
            while (thread_slices[gpu].status != ThreadSlice::Status::Idle)
            {
                sleep(0);
            }
        }
#

check whether has error

        for (int gpu=0; gpu<this->num_gpus * this -> num_gpus;gpu++)
        if (this->enactor_stats[gpu].retval!=cudaSuccess)
        {
            retval=this->enactor_stats[gpu].retval;
            return retval;
        }

        if (this -> debug) printf("\nGPU SSSP Done.\n");
        return retval;
    }
#

Define filter and advance KernelPolicy

#

Enact calling functions

#

Enact interface

    cudaError_t Enact(
        VertexId src,
        std::string traversal_mode = "LB")
    {
#

Select traversal mode, and SM version to call

        return EnactSSSP<...>(src);
    }
#

Init interface

    cudaError_t Init(...)
    {
#

Select traversal mode, and SM version to call

        return InitSSSP<...>(...);
    }
    
};