Published: 2022-01-29 | Category: [»] Programming.

In the computer industry, a job represents a small piece of action that can be performed independently of the rest of the software. Here, I present a system that can execute such actions concurrently (i.e. in parallel) and manage the dependencies (inter-relations) between them. Note that the word “small” must be lit under the context of the overall software as the complexity can vary between several orders of magnitude.

The complete program source code can be downloaded at [∞] our companion website under a LGPLv3 license.

The system described here is part of a larger structure that was developed for the #DevOptical series the last past months. However, due to its overall complexity, I chose to break it down in three distinct parts. This post is part #1.

To illustrate the way the library operates, I will use a practical example. It is at the same time both a very good example and a poor one, for reasons that will appear clearer later in this post. The example here is mostly a pretext used to illustrate the technology as the job system can handle about any tasks.

I will use the [∞] phase stepping formula of Pr. P. Hariharan which converts interferometric fringes images into phase and contrast information. I have been working around with this formula for the past 7 years so it was kind of logical decision for me to pick this formula. If you don’t know what the formula is about, just ignore the context as we only need the math expression. It is written as:

where Ix are the 5 phase stepping images acquired with a spacing of π/4.

We can re-arrange this formula as

with

Now you may wonder, what’s the relation with our job system?

To make the transition, consider that each of these mathematical operations (+, -, /, square, atan, sqrt) actually applies on 2-dimensional images which we can think of as small pieces of independent actions with some dependencies. There is one action to compute I3-I0, another one to compute I0-I2, yet another to compute I4-I2 and so on.

We can represent the dependencies as the directed graph of Figure 1. I added letters to each of the actions to identify them later on more easily.

From this graph representation, we can already draw some important conclusions. Jobs A, B, C, E and G are independent and can therefore be run concurrently. Same for J and L, N and P etc. On the other hand, D cannot be performed before B and C are done. Similarly, F cannot be performed before E is done etc. To save time, we could therefore run in parallel all the jobs that are independent and wait for their completion before moving on to the next operations. That is specifically the concept of the library that has been developed here.

In the library, jobs are derived from a job_base master class. The action they need to perform is handled by a _run() command that you need to implement. Jobs also have a current status which can be “pending” (waiting for processing), “processing” (the action is currently being performed) and “done” (the action has been performed to completion). Other status exists but are more technical, such as “error” which state that the job action has been aborted due to an internal exception.

The way the worker thread pool operates is shown in Figure 2. The list is browsed from left to right until the system finds a worker thread that is currently idling. It then assigns the job to this worker thread and signal some work is ready for processing. If all the working threads are busy, the system wait for a short period of time (typically 1 ms) and start browsing the list again until it finds some free capacity.

When workers have been idling for a too long period of time (by default 10 seconds), they are stopped to prevent wasting system resources. Threads are automatically restarted when jobs are assigned to them.

We can now tackle how we go from the job representation of Figure 1 to the assignment procedure of Figure 2. This is handled by the jobs manager.

When we assign a specific job to the jobs manager (such as the job that computes the phase or the job that computes the contrast in Figure 1), the system walks across the dependencies in a bottom-up manner. For instance, the phase job M requires the jobs K and L to be performed first. The job K itself needs J to be performed, J needs A to be performed and so on. For this to work, you must specify the dependencies of each of your jobs using the add_dependency function of the job_base class as you create your job class.

As the jobs manager walks across the dependencies, it populates the list of Figure 3.

As long as the list is not empty, the jobs manager will pick a job that is ready to be processed and assign it to the worker thread pool of Figure 2. To identify if a job is ready to be processed, the jobs manager walk across the list and test for each entry if all their dependencies have the status “done”. If a job dependencies are all done but the job itself is marked as “pending”, it can be processed. When a job is ready to be processed, it is removed from the pending list of Figure 3. If the jobs manager reaches the end of the list but no jobs are currently ready to be processed, it waits for a small period of time (typically 10 ms) and start browsing the list again until it finds something to be done.

If you pay attention to Figure 3, you will see that some jobs appear several time. This is not an issue because the system will automatically prune any job that is not in the pending state and duplicates will either be in “processing” or “done” state.

To complete our journey, let’s now look at the application level. We have the two jobs “phase” and “contrast” of Figure 1 that we have assigned to the jobs manager and we need to wait for their completions before we can move on to the rest of the program. There are two ways to handle this behaviour. Either we wait and stall the program until the jobs are done, or we continue doing other operations and call a specific function when the result of the computation is ready. The former is implemented using the wait_for() function and the latter is implemented using the set_done_action() function.

In the example I’m using the wait_for() method because it is not too much of an issue for a console program to stop its operation and wait for data. This is however not the case in interactive programs where you don’t want the user interface to stall waiting for some data!

The wait_for() mechanism implies some consequence. Since the current thread is paused waiting for the job completion, this frees some extra processing capacity at the processor level. However, our worker thread pool has a fixed size and won’t therefore reflect that change in capacity. Even worse, if you have jobs that themselves call wait_for you can end up in a situation where all threads are pending but no more capacity is available to process the new data which is required to unlock the pending jobs!

To alleviate both of these issues, it is possible to allocate extra capacity to the worker thread pool using the allow_more() function. The wait_for() function make usage of this mechanism by allowing 1 extra worker as we wait for the current jobs to be processed. The allow_more() function will return an object which will take care of automatic cleaning of the extra capacity as it is destructed through a RIAA idiom. So, all you need to do is to call the function and the extra capacity will only be allocated for the current scope:

```// temporarily allow one more thread auto allow_more(void) { class extra_thread_allowance { public: extra_thread_allowance(std::shared_ptr<worker_thread> p) { this->m_thread = p; } extra_thread_allowance(extra_thread_allowance&& other) { this->m_thread = std::move(other.m_thread); } extra_thread_allowance(const extra_thread_allowance&) = delete; const extra_thread_allowance& operator=(const extra_thread_allowance&) = delete; ~extra_thread_allowance(void) { if (this->m_thread != nullptr) this->m_thread->clean_stop(); this->m_thread = nullptr; } private: std::shared_ptr<worker_thread> m_thread; }; // create thread with infinite timing auto p = std::make_shared<worker_thread>(0); _debug("creating a new temporary thread %p", p.get()); p->start(); // add to list { AUTOLOCK(this->m_lock); this->m_extra_threads.push_back(p); } // return object that will self kill thread once destructed return extra_thread_allowance(p); }```

To illustrate, here is the wait_for() function:

```// wait for static void wait_for(std::list<std::shared_ptr<job_base>> list) { _debug("waiting for %zu jobs", list.size()); // temporarily increase number of allowed threads auto obj = ::get_instance<thread_pool_manager>()->allow_more(); // queue jobs for (auto& p : list) get_instance<jobs_manager>()->queue(p); // wait for job to finish for (auto& p : list) { if (p != nullptr) p->wait(); } // eventually throw error for (auto& p : list) { if (__ISNULLPTR(p)) continue; // throw error if(p->get_status() == job_base::status::error) throw_exception(job_exception, p->get_error_message()); // job should be finished by now if (p->get_status() != job_base::status::done) throw_exception(job_done_exception); } }```

The function allocates some extra capacity for the scope of the function. That is, the extra capacity will be freed as the function returns or triggers an error. We then queue all the jobs passed to the function to the jobs manager and wait for their completion. Note that it is important to first push everything before waiting. You don’t want to push & wait one at a time or the jobs will be ran in series rather than in parallel. Finally, we check that all the jobs completed nicely and eventually trigger an exception if not.

At the application level, we create a job called hariharan which will compute both the phase and contrast and wait for it:

```// create hariharan job printf("creating job... "); auto p = std::make_shared<hariharan>(fringes); printf("done\r\n"); // queue job printf("waiting for job... "); wait_for({ p }); printf("done\r\n");```

The hariharan job is defined as:

```#pragma once #include "core/jobs/jobs.h" #include "core/improc/map2d.h" #include "rev_polish_job.h" /* hariharan job class */ class hariharan : public job_base { public: // ctor hariharan(const std::array<map2d<float>, 5>& fringes) { // convert fringes to rev_polish_job_base objects std::vector<std::shared_ptr<rev_polish_job_base>> fringes_objects; for (size_t i = 0; i < 5; i++) fringes_objects.push_back(std::make_shared<rev_polish_job_ref>(fringes[i])); // create T objects auto T1 = create_reverse_polish_job(fringes_objects, "&1 &3 -"); auto T2 = create_reverse_polish_job(fringes_objects, "&2 &0 - &2 &4 - +"); auto T3 = create_reverse_polish_job(fringes_objects, "&2 &1 &0 + + &2 &4 &3 + + +"); std::vector<std::shared_ptr<rev_polish_job_base>> t_objects; t_objects.push_back(nullptr); t_objects.push_back(T1); t_objects.push_back(T2); t_objects.push_back(T3); // create phase and contrast dependencies this->m_phase = create_reverse_polish_job(t_objects, "&2 neg 2 &1 * neg atan2"); this->m_contrast = create_reverse_polish_job(t_objects, "2 &3 * 3 4 &1 ^2 * &2 ^2 + sqrt * /"); // add as dependencies add_dependency(this->m_phase); add_dependency(this->m_contrast); } // return phase const map2d<float>& get_phase(void) const { if (__ISNULLPTR(this->m_phase)) throw_exception(null_job_exception); return this->m_phase->get(); } // return contrast const map2d<float>& get_contrast(void) const { if (__ISNULLPTR(this->m_contrast)) throw_exception(null_job_exception); return this->m_contrast->get(); } private: // nothing to do virtual void _run(void) override {} // these will hold the results std::shared_ptr<rev_polish_job_base> m_contrast, m_phase; };```

It is worth noting that although I could have implemented the graph of Figure 1 directly, I preferred opting for a reverse polish calculation method which is much more general and re-usable. Reverse polish calculations are a form of [∞] stack-oriented programming and are very potent at ordering jobs like we do here. For instance, ((I0 + I1) + I2) + ((I3 + I4) + I2) can be written as I2 I1 I0 + + I2 I4 I3 + + +. I strongly recommend reading the Wikipedia page if you don’t know yet about stack-oriented programming or reverse polish notation. The implementation of the system goes a bit beyond the scope of this post but you can read the create_reverse_polish_job() function if you would like to know more.

After creating the three jobs T1, T2 and T3 we create the two jobs phase and contrast and add them as dependencies. Job and contrast will have their own dependencies that are built on the Ti, leading exactly to the structure of Figure 1. The hariharan job itself execute nothing (the _run() method is empty) because everything is contained in the two jobs phase and contrast. The class implement two functions get_phase() and get_contrast() to retrieve the results. And that is everything it takes to compute the Hariharan formula concurrently in our job system.

Let’s now discuss a bit about the results of the program, mainly in terms of benchmarking.

The time taken to process the Hariharan job for varying number of worker threads is shown in Figure 4. I have also added as a red line in the plot the time needed for the direct, non-jobbed, version of the formula.

The system clearly does not perform well as there is minor to none scalability with the number of worker threads and that it performs much worse than the non-jobbed version. This can be explained through several factors:

1) Not all operations take the same time. The + and – operations are among the fastest, followed by multiplication and divisions which requires several cycles. Square root and arc-tangent are even more costly operation and they probably concentrate the time spent during the processing. This basically kills all possibility to improve performance because the jobs that run mostly parallel are the one that takes the less time to compute (+ and -).

2) Division of the operations increase overhead. Dividing the Hariharan formula into separate actions like we did has two major effects: (a) it creates a lot of temporary maps which are located at different places of memory and probably induce tons of cache misses; (b) each operation will scan the maps in a double “for-loop” which also increases the overhead (see below).

3) The job system itself has some small overhead as we have seen that it waits by chunks of 10 ms by default which might not be good within the scale of the Hariharan job.

To understand why scanning produces overhead you need to know some implementation details of the library. When I wrote the code, I had the intend to make it as safe as possible rather than fast. I have written tons of programs, professionally, who made trade-offs between safety and speed because the major concern was speed rather than safety, but here I wanted safe operations in all cases. If you look at the function that access a map’s pixel data you will see:

```// get pixel (non-const version) type& operator()(size_t x, size_t y) { // throw error if beyond dimensions if (x >= this->m_width || y >= this->m_height) throw_exception(array_out_of_bounds_exception); // return pixel reference return this->m_data.at(MAKE_XY(x, y, this->m_width)); } /* ... */ // get data at type& at(const size_t index) { if(index >= this->m_num_elems) throw_exception(memory_out_of_bound_exception); return *MAKE_AT(this->m_data, index); } /* ... */ // generic safe cast will not compile template<typename type1, typename type2> type1 safe_cast(const type2 value) { auto ret = (type1)value; if ((type2)ret != value) throw_exception(safe_cast_exception); return ret; } #define MAKE_UINTPTR_T(value) safe_cast<uintptr_t>(value) template<typename type> type safe_add(const type val_a, const type val_b) { if (val_a >= unsigned_max_val<type>() - val_b) throw_exception(safe_add_exception); return val_a + val_b; } #define MAKE_ADD(val_a, val_b) safe_add(val_a, val_b) // safe multiplication template<typename type> type safe_mult(const type val_a, const type val_b) { auto countbits = [](type value) { size_t i = 0; while (((type)1 << i) < value && (i+1) < 8 * sizeof(type)) i++; return i; }; if (countbits(val_a) + countbits(val_b) > 8 * sizeof(type)) throw_exception(safe_multiply_exception); return val_a * val_b; } #define MAKE_MULT(val_a, val_b) safe_mult(val_a, val_b) // safe address computation template<typename type> type* safe_address(const type *base, const uintptr_t offset) { if ((uintptr_t)base >= UINTPTR_MAX - offset) throw_exception(safe_address_exception); return (type*)((unsigned char*)base + offset); } #define MAKE_ADDRESS(base, ofs) safe_address(base, MAKE_UINTPTR_T(ofs)) // increment pointer if not null template<typename type> type* safe_incptr(const type* base, const uintptr_t offset) { if (base == nullptr) return nullptr; return MAKE_ADDRESS(base, MAKE_MULT(offset, sizeof(type))); } #define MAKE_AT(base, ofs) safe_incptr(base, ofs) // macro that are usefull #define MAKE_XY(x, y, width) (MAKE_ADD(x, MAKE_MULT(y, width)))```

The purpose is to assert that no integer overflow will ever occur. This is the choice of safety which is made at the expense of execution time because the safe_xxx functions will require some time and these will be run at every pixel of every temporary map.

All these reasons explain why I said when introducing this post that Hariharan was at the same time a good example and a poor one. It is a good example because it illustrate quite well what a job can be and what are inter-dependences between jobs, but, at the same time, the example is poorly chosen because it offers worse performance in a jobbed environment than in a non-jobbed environment.

That being said, Figure 1 is not the only way to implement the Hariharan formula in a jobbed system. Up to now, we took the decision to parallelize operations between the map. It is also possible to parallelize the operation themselves.

You can picture yourself the fringes images, contrast and phase maps as a set of 1D data where the Hariharan formula is applied independently to each of the entry of the set. This is called a per-pixel operation and it is very easy to parallelize.

What we can do is to create ‘n’ worker threads and divide the computation into ‘k*n’ consecutive blocks where k is an integer greater than or equal to 1. We then create ‘k*n’ jobs to process these blocks concurrently. The value of ‘n’ will reach an optimum depending on the operating conditions (action to be performed, size of the blocks, processor, cache speed etc.) and ‘k’ is usually chosen as 1 but any other value can be tested as well too. You can either use a default value for ‘n’, use an heuristic or a database depending on the operating condition. I used both approach professionally and the database offer a slight advantage on small maps (256×256 or less) for fast operations like addition, subtraction etc. It however requires populating the database which is a long and tedious process and requires to be performed on the target machine in ideal conditions (no other program running etc.).

Since this is beyond the scope of this article, we will stick to fixing the value of ‘n’ ourselves.

The code is:

```class parallel_job : public job_base { public: using func_t = std::function<void(const size_t x, const size_t y)>; parallel_job(size_t xmin, size_t xmax, size_t ymin, size_t ymax, func_t func) { this->m_xmin = xmin; this->m_xmax = xmax; this->m_ymin = ymin; this->m_ymax = ymax; this->m_func = func; } private: virtual void _run(void) { if (!this->m_func) return; for (size_t y = this->m_ymin; y < this->m_ymax; y++) for (size_t x = this->m_xmin; x < this->m_xmax; x++) this->m_func(x, y); } size_t m_xmin, m_xmax, m_ymin, m_ymax; func_t m_func; }; /* ... */ map2d<float> res_phase(width, height), res_contrast(width, height); size_t y_step = (height + num_workers - 1) / num_workers; auto _func = [&](const size_t x, const size_t y) { auto T1 = fringes[3](x, y) - fringes[1](x, y); auto T2 = fringes[0](x, y) + fringes[4](x, y) - 2.0f * fringes[2](x, y); auto T3 = fringes[0](x, y) + fringes[1](x, y) + 2.0f * fringes[2](x, y) + fringes[3](x, y) + fringes[4](x, y); res_phase(x, y) = std::atan2(-2.0f * T1, -T2); res_contrast(x, y) = 3.0f * sqrt(4.0f * T1 * T1 + T2 * T2) / (2.0f * T3); }; // run jobs std::list<std::shared_ptr<job_base>> list; for (size_t i = 0; i < num_workers; i++) list.push_back(std::make_shared<parallel_job>(0, width, i * y_step, std::min((i + 1) * y_step, height), _func)); wait_for(list);```

For which we obtain the benchmarking results of Figure 5.

The scalability is bit better but it is not the best performances I have seen. As a comparison, the implementation I did for my employer and which make usage of the GPU performs the same computation in much less than 1 ms (>100× faster). Again, the safety mechanism and the small overhead of the job system might explain what is observed.

Most of all, the purpose of this post was not to perform image transforms but to introduce you, in a simple way, to the job system that was implemented. As we will move in the #DevOptical series, I will show how this system can be used in the context of lens optimization. But this will come later!

Don’t forget to stay tuned to watch the second part of this series!

I would like to give a big thanks to James, Naif, Lilith, Cam, Samuel, Themulticaster, Sivaraman, Vaclav, Arif and Jesse who have supported this post through [∞] Patreon. I also take the occasion to invite you to donate through Patreon, even as little as \$1. I cannot stress it more, you can really help me to post more content and make more experiments!

[⇈] Top of Page