Control Structures

Detailed Description

The primary way Agency programs create execution is by invoking a control structure. Control structures are functions invoked via composition with an execution policy. Execution policies parameterize control structures by describing the properties of the requested execution.

For example, the following code snipped uses the bulk_invoke() control structure with the par execution policy to require the parallel execution of ten invocations of a lambda function:

using namespace agency;
bulk_invoke(par(10), [](parallel_agent& self)
{
// task body here
...
});

Functions

template<class ExecutionPolicy , class Function , class... Args>
see_below agency::bulk_async (ExecutionPolicy &&policy, Function f, Args &&...args)
 Creates a bulk asynchronous invocation. More...
 
template<class ExecutionPolicy , class Function , class... Args>
see_below agency::bulk_invoke (ExecutionPolicy &&policy, Function f, Args &&...args)
 Creates a bulk synchronous invocation. More...
 
template<class ExecutionPolicy , class Function , class Future , class... Args>
see_below agency::bulk_then (ExecutionPolicy &&policy, Function f, Future &predecessor, Args &&...args)
 Creates a bulk continuation. More...
 

Function Documentation

template<class ExecutionPolicy , class Function , class... Args>
see_below agency::bulk_async ( ExecutionPolicy &&  policy,
Function  f,
Args &&...  args 
)

bulk_async is a control structure which asynchronously creates a group of function invocations with forward progress ordering as required by an execution policy. The results of these invocations, if any, are collected into a container and returned as bulk_async's asynchronous result. A future object corresponding to the eventual availability of this container is returned as bulk_async's result.

bulk_async asynchronously creates a group of function invocations of size N, and each invocation i in [0,N) has the following form:

result_i = f(agent_i, arg_i_1, arg_i_2, ..., arg_i_M)

agent_i is a reference to an execution agent which identifies the ith invocation within the group. The parameter arg_i_j depends on the M arguments arg_j passed to bulk_async:

  • If arg_j is a shared parameter, then it is a reference to an object shared among all execution agents in agent_i's group.
  • Otherwise, arg_i_j is a copy of argument arg_j.

If the invocations of f do not return void, these results are collected and returned in a container results, whose type is implementation-defined. If invocation i returns result_i, and this invocation's agent_i has index idx_i, then results[idx_i] yields result_i.

Parameters
policyAn execution policy describing the requirements of the execution agents created by this call to bulk_async.
fA function defining the work to be performed by execution agents.
argsAdditional arguments to pass to f when it is invoked.
Returns
A void future object, if f has no result; otherwise, a future object corresponding to the eventually available container of f's results indexed by the execution agent which produced them.
Note
The type of future object returned by bulk_async is a property of the type of ExecutionPolicy used as a parameter.
Template Parameters
ExecutionPolicyThis type must fulfill the requirements of ExecutionPolicy.
FunctionFunction's first parameter type must be ExecutionPolicy::execution_agent_type&. The types of its additional parameters must match Args....
ArgsEach type in Args... must match the type of the corresponding parameter of Function.

The following example demonstrates how to use bulk_async to create tasks which execute asynchronously with the caller.

#include <iostream>
#include <mutex>
int main()
{
using namespace agency;
std::cout << "Starting two tasks asynchronously..." << std::endl;
std::mutex mut;
// asynchronously create 5 agents to greet us in bulk
auto f1 = bulk_async(par(5), [&](parallel_agent& self)
{
mut.lock();
std::cout << "Hello, world from agent " << self.index() << " in task 1" << std::endl;
mut.unlock();
});
// asynchronously create 5 agents to greet us in bulk
auto f2 = bulk_async(par(5), [&](parallel_agent& self)
{
mut.lock();
std::cout << "Hello, world from agent " << self.index() << " in task 2" << std::endl;
mut.unlock();
});
std::cout << "Sleeping before waiting on the tasks..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Woke up, waiting for the tasks to complete..." << std::endl;
// wait for tasks 1 & 2 to complete before continuing
f1.wait();
f2.wait();
std::cout << "OK" << std::endl;
return 0;
}

Messages from the agents in the two asynchronous tasks are printed while the main thread sleeps:

$ clang -std=c++11 -I. -lstdc++ -pthread examples/hello_async.cpp -o hello_async
$ ./hello_async
Starting two tasks asynchronously...
Sleeping before waiting on the tasks...
Hello, world from agent 0 in task 1
Hello, world from agent 1 in task 1
Hello, world from agent 2 in task 1
Hello, world from agent 3 in task 1
Hello, world from agent 4 in task 1
Hello, world from agent 0 in task 2
Hello, world from agent 1 in task 2
Hello, world from agent 2 in task 2
Hello, world from agent 3 in task 2
Hello, world from agent 4 in task 2
Woke up, waiting for the tasks to complete...
OK
See also
bulk_invoke
bulk_then
template<class ExecutionPolicy , class Function , class... Args>
see_below agency::bulk_invoke ( ExecutionPolicy &&  policy,
Function  f,
Args &&...  args 
)

bulk_invoke is a control structure which creates a group of function invocations with forward progress ordering as required by an execution policy. The results of these invocations, if any, are collected into a container and returned as bulk_invoke's result.

bulk_invoke creates a group of function invocations of size N, and each invocation i in [0,N) has the following form:

result_i = f(agent_i, arg_i_1, arg_i_2, ..., arg_i_M)

agent_i is a reference to an execution agent which identifies the ith invocation within the group. The parameter arg_i_j depends on the M arguments arg_j passed to bulk_invoke:

  • If arg_j is a shared parameter, then it is a reference to an object shared among all execution agents in agent_i's group.
  • Otherwise, arg_i_j is a copy of argument arg_j.

If the invocations of f do not return void, these results are collected and returned in a container results, whose type is implementation-defined. If invocation i returns result_i, and this invocation's agent_i has index idx_i, then results[idx_i] yields result_i.

Parameters
policyAn execution policy describing the requirements of the execution agents created by this call to bulk_invoke.
fA function defining the work to be performed by execution agents.
argsAdditional arguments to pass to f when it is invoked.
Returns
void, if f has no result; otherwise, a container of f's results indexed by the execution agent which produced them.
Template Parameters
ExecutionPolicyThis type must fulfill the requirements of ExecutionPolicy.
FunctionFunction's first parameter type must be ExecutionPolicy::execution_agent_type&. The types of its additional parameters must match Args....
ArgsEach type in Args... must match the type of the corresponding parameter of Function.

The following example demonstrates how to use bulk_invoke to print 10 "Hello, world" messages in sequence.

#include <iostream>
int main()
{
// create 10 sequenced_agents to greet us in bulk
agency::bulk_invoke(agency::seq(10), [](agency::sequenced_agent& self)
{
std::cout << "Hello, world from agent " << self.index() << std::endl;
});
return 0;
}

Messages from agents 0 through 9 are printed in sequential order:

$ clang -std=c++11 -I. -lstdc++ -pthread examples/hello_lambda.cpp -o hello_lambda
$ ./hello_lambda
Hello, world from agent 0
Hello, world from agent 1
Hello, world from agent 2
Hello, world from agent 3
Hello, world from agent 4
Hello, world from agent 5
Hello, world from agent 6
Hello, world from agent 7
Hello, world from agent 8
Hello, world from agent 9

Changing the execution policy used in the call to bulk_invoke changes how and where the execution agents will execute the provided function. This example demonstrates how to use bulk_invoke with par to execute the SAXPY operation in parallel:

#include <vector>
#include <cassert>
#include <iostream>
int main()
{
// set up some inputs
size_t n = 16 << 20;
std::vector<float> x(n, 1), y(n, 2);
float a = 13.;
// use par to execute SAXPY in parallel, and collect the results
auto results = agency::bulk_invoke(agency::par(n), [&](agency::parallel_agent& self)
{
int i = self.index();
return a * x[i] + y[i];
});
// check the result
std::vector<float> ref(n, a * 1.f + 2.f);
assert(ref == results);
std::cout << "OK" << std::endl;
return 0;
}

Remember to include optimization (-O3, in this example) to execute fast:

$ clang -std=c++11 -I. -lstdc++ -pthread -O3 examples/saxpy.cpp -o saxpy
$ ./saxpy 
OK
See also
bulk_async
bulk_then
template<class ExecutionPolicy , class Function , class Future , class... Args>
see_below agency::bulk_then ( ExecutionPolicy &&  policy,
Function  f,
Future &  predecessor,
Args &&...  args 
)

bulk_then is a control structure which asynchronously creates a group of function invocations with forward progress ordering as required by an execution policy. These invocations are a bulk continuation to a predecessor bulk asynchronous invocation. The predecessor bulk asynchronous invocation is represented by a future object, and the continuation will not execute until the predecessor's future object becomes ready. The results of the continuation's invocations, if any, are collected into a container and returned as bulk_then's asynchronous result. A future object corresponding to the eventual availability of this container is returned as bulk_then's result.

bulk_then asynchronously creates a group of function invocations of size N, and each invocation i in [0,N) has the following form:

result_i = f(agent_i, predecessor_arg, arg_i_1, arg_i_2, ..., arg_i_M)

agent_i is a reference to an execution agent which identifies the ith invocation within the group. The parameter predecessor_arg is a reference to the value of the future object used as a parameter to bulk_then. If this future object has a void value, then this parameter is omitted. The parameter arg_i_j depends on the M arguments arg_j passed to bulk_invoke:

  • If arg_j is a shared parameter, then it is a reference to an object shared among all execution agents in agent_i's group.
  • Otherwise, arg_i_j is a copy of argument arg_j.

If the invocations of f do not return void, these results are collected and returned in a container results, whose type is implementation-defined. If invocation i returns result_i, and this invocation's agent_i has index idx_i, then results[idx_i] yields result_i.

Parameters
policyAn execution policy describing the requirements of the execution agents created by this call to bulk_then.
fA function defining the work to be performed by execution agents.
predecessorA future object representing the predecessor task. Its future value, if it has one, is passed to f as an argument when f is invoked. After bulk_then returns, predecessor is invalid if it is not a shared future.
argsAdditional arguments to pass to f when it is invoked.
Returns
void, if f has no result; otherwise, a container of f's results indexed by the execution agent which produced them.
Template Parameters
ExecutionPolicyThis type must fulfill the requirements of ExecutionPolicy.
FunctionFunction's first parameter type must be ExecutionPolicy::execution_agent_type&. The types of its additional parameters must match Args....
FutureThis type must fulfill the requirements of Future. If the value type of this Future is not void, this type must match the type of the second parameter of Function.
ArgsEach type in Args... must match the type of the corresponding parameter of Function.

The following example demonstrates how to use bulk_then to sequence a continuation after a predecessor task:

#include <iostream>
#include <mutex>
int main()
{
using namespace agency;
std::cout << "Starting predecessor and continuation tasks asynchronously..." << std::endl;
std::mutex mut;
// asynchronously create 5 agents to greet us in a predecessor task
std::future<void> predecessor = bulk_async(par(5), [&](parallel_agent& self)
{
mut.lock();
std::cout << "Hello, world from agent " << self.index() << " in the predecessor task" << std::endl;
mut.unlock();
});
// create a continuation to the predecessor
std::future<void> continuation = bulk_then(par(5), [&](parallel_agent& self)
{
mut.lock();
std::cout << "Hello, world from agent " << self.index() << " in the continuation" << std::endl;
mut.unlock();
},
predecessor);
std::cout << "Sleeping before waiting on the continuation..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Woke up, waiting for the continuation to complete..." << std::endl;
// wait for the continuation to complete before continuing
continuation.wait();
std::cout << "OK" << std::endl;
return 0;
}

Messages from agents in the predecessor task are guaranteed to be output before messages from the continuation:

$ clang -std=c++11 -I. -lstdc++ -pthread examples/hello_then.cpp -o hello_then
$ ./hello_then
Starting predecessor and continuation tasks asynchronously...
Sleeping before waiting on the continuation...
Hello, world from agent 0 in the predecessor task
Hello, world from agent 1 in the predecessor task
Hello, world from agent 2 in the predecessor task
Hello, world from agent 3 in the predecessor task
Hello, world from agent 4 in the predecessor task
Hello, world from agent 0 in the continuation
Hello, world from agent 1 in the continuation
Hello, world from agent 2 in the continuation
Hello, world from agent 3 in the continuation
Hello, world from agent 4 in the continuation
Woke up, waiting for the continuation to complete...
OK
See also
bulk_invoke
bulk_async