Kokkos Core Kernels Package Version of the Day
Loading...
Searching...
No Matches
Kokkos_HPX.hpp
1//@HEADER
2// ************************************************************************
3//
4// Kokkos v. 4.0
5// Copyright (2022) National Technology & Engineering
6// Solutions of Sandia, LLC (NTESS).
7//
8// Under the terms of Contract DE-NA0003525 with NTESS,
9// the U.S. Government retains certain rights in this software.
10//
11// Part of Kokkos, under the Apache License v2.0 with LLVM Exceptions.
12// See https://kokkos.org/LICENSE for license information.
13// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
14//
15//@HEADER
16
17#ifndef KOKKOS_IMPL_PUBLIC_INCLUDE
18#include <Kokkos_Macros.hpp>
19static_assert(false,
20 "Including non-public Kokkos header files is not allowed.");
21#endif
22#ifndef KOKKOS_HPX_HPP
23#define KOKKOS_HPX_HPP
24
25#include <Kokkos_Macros.hpp>
26#if defined(KOKKOS_ENABLE_HPX)
27
28#include <Kokkos_Core_fwd.hpp>
29
30#include <Kokkos_HostSpace.hpp>
31#include <cstddef>
32#include <iosfwd>
33
34#ifdef KOKKOS_ENABLE_HBWSPACE
35#include <Kokkos_HBWSpace.hpp>
36#endif
37
38#include <Kokkos_HostSpace.hpp>
39#include <Kokkos_Layout.hpp>
40#include <Kokkos_MemoryTraits.hpp>
41#include <Kokkos_Parallel.hpp>
42#include <Kokkos_ScratchSpace.hpp>
43#include <Kokkos_TaskScheduler.hpp>
44#include <impl/Kokkos_ConcurrentBitset.hpp>
45#include <impl/Kokkos_FunctorAnalysis.hpp>
46#include <impl/Kokkos_Tools.hpp>
47#include <impl/Kokkos_TaskQueue.hpp>
48#include <impl/Kokkos_InitializationSettings.hpp>
49
50#include <KokkosExp_MDRangePolicy.hpp>
51
52#include <hpx/local/algorithm.hpp>
53#include <hpx/local/barrier.hpp>
54#include <hpx/local/condition_variable.hpp>
55#include <hpx/local/execution.hpp>
56#include <hpx/local/future.hpp>
57#include <hpx/local/init.hpp>
58#include <hpx/local/mutex.hpp>
59#include <hpx/local/runtime.hpp>
60#include <hpx/local/thread.hpp>
61
62#include <Kokkos_UniqueToken.hpp>
63
64#include <functional>
65#include <iostream>
66#include <memory>
67#include <sstream>
68#include <type_traits>
69#include <vector>
70
71// There are currently two different implementations for the parallel dispatch
72// functions:
73//
74// - 0: The HPX way. Unfortunately, this comes with unnecessary
75// overheads at the moment, so there is
76// - 1: The manual way. This uses for_loop, but only spawns one task per worker
77// thread. This is significantly faster in most cases.
78//
79// In the long run 0 should be the preferred implementation, but until HPX is
80// improved 1 will be the default.
81#ifndef KOKKOS_HPX_IMPLEMENTATION
82#define KOKKOS_HPX_IMPLEMENTATION 1
83#endif
84
85#if (KOKKOS_HPX_IMPLEMENTATION < 0) || (KOKKOS_HPX_IMPLEMENTATION > 1)
86#error "You have chosen an invalid value for KOKKOS_HPX_IMPLEMENTATION"
87#endif
88
89// [note 1]
90//
91// When using the asynchronous backend and independent instances, we explicitly
92// reset the shared data at the end of a parallel task (execute_task). We do
93// this to avoid circular references with shared pointers that would otherwise
94// never be released.
95//
96// The HPX instance holds shared data for the instance in a shared_ptr. One of
97// the pieces of shared data is the future that we use to sequence parallel
98// dispatches. When a parallel task is launched, a copy of the closure
99// (ParallelFor, ParallelReduce, etc.) is captured in the task. The closure
100// also holds the policy, the policy holds the HPX instance, the instance holds
101// the shared data (for use of buffers in the parallel task). When attaching a
102// continuation to a future, the continuation is stored in the future (shared
103// state). This means that there is a cycle future -> continuation -> closure
104// -> policy -> HPX -> shared data -> future. We break this by releasing the
105// shared data early, as (the pointer to) the shared data will not be used
106// anymore by the closure at the end of execute_task.
107//
108// We also mark the shared instance data as mutable so that we can reset it
109// from the const execute_task member function.
110
111namespace Kokkos {
112namespace Impl {
113class thread_buffer {
114 static constexpr std::size_t m_cache_line_size = 64;
115
116 std::size_t m_num_threads;
117 std::size_t m_size_per_thread;
118 std::size_t m_size_total;
119 char *m_data;
120
121 void pad_to_cache_line(std::size_t &size) {
122 size = ((size + m_cache_line_size - 1) / m_cache_line_size) *
123 m_cache_line_size;
124 }
125
126 public:
127 thread_buffer()
128 : m_num_threads(0),
129 m_size_per_thread(0),
130 m_size_total(0),
131 m_data(nullptr) {}
132 thread_buffer(const std::size_t num_threads,
133 const std::size_t size_per_thread) {
134 resize(num_threads, size_per_thread);
135 }
136 ~thread_buffer() { delete[] m_data; }
137
138 thread_buffer(const thread_buffer &) = delete;
139 thread_buffer(thread_buffer &&) = delete;
140 thread_buffer &operator=(const thread_buffer &) = delete;
141 thread_buffer &operator=(thread_buffer) = delete;
142
143 void resize(const std::size_t num_threads,
144 const std::size_t size_per_thread) {
145 m_num_threads = num_threads;
146 m_size_per_thread = size_per_thread;
147
148 pad_to_cache_line(m_size_per_thread);
149
150 std::size_t size_total_new = m_num_threads * m_size_per_thread;
151
152 if (m_size_total < size_total_new) {
153 delete[] m_data;
154 m_data = new char[size_total_new];
155 m_size_total = size_total_new;
156 }
157 }
158
159 char *get(std::size_t thread_num) {
160 assert(thread_num < m_num_threads);
161 if (m_data == nullptr) {
162 return nullptr;
163 }
164 return &m_data[thread_num * m_size_per_thread];
165 }
166
167 std::size_t size_per_thread() const noexcept { return m_size_per_thread; }
168 std::size_t size_total() const noexcept { return m_size_total; }
169};
170} // namespace Impl
171
172namespace Experimental {
173class HPX {
174 public:
175 static constexpr uint32_t impl_default_instance_id() { return 1; }
176
177 private:
178 static bool m_hpx_initialized;
179 uint32_t m_instance_id = impl_default_instance_id();
180
181#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
182 static std::atomic<uint32_t> m_next_instance_id;
183
184 public:
185 enum class instance_mode { default_, independent };
186
187 private:
188 static uint32_t m_active_parallel_region_count;
189 static hpx::spinlock m_active_parallel_region_count_mutex;
190 static hpx::condition_variable_any m_active_parallel_region_count_cond;
191
192 struct instance_data {
193 instance_data() = default;
194 instance_data(hpx::shared_future<void> future) : m_future(future) {}
195 Kokkos::Impl::thread_buffer m_buffer;
196 hpx::shared_future<void> m_future = hpx::make_ready_future<void>();
197 hpx::spinlock m_future_mutex;
198 };
199
200 mutable std::shared_ptr<instance_data> m_independent_instance_data;
201 static instance_data m_default_instance_data;
202
203 std::reference_wrapper<Kokkos::Impl::thread_buffer> m_buffer;
204 std::reference_wrapper<hpx::shared_future<void>> m_future;
205 std::reference_wrapper<hpx::spinlock> m_future_mutex;
206#else
207 static Kokkos::Impl::thread_buffer m_default_buffer;
208#endif
209
210 public:
211 using execution_space = HPX;
212 using memory_space = HostSpace;
214 using array_layout = LayoutRight;
215 using size_type = memory_space::size_type;
216 using scratch_memory_space = ScratchMemorySpace<HPX>;
217
218#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
219 HPX()
220 noexcept
221 : m_instance_id(impl_default_instance_id()),
222 m_buffer(m_default_instance_data.m_buffer),
223 m_future(m_default_instance_data.m_future),
224 m_future_mutex(m_default_instance_data.m_future_mutex) {}
225
226 HPX(instance_mode mode)
227 : m_instance_id(mode == instance_mode::independent
228 ? m_next_instance_id++
229 : impl_default_instance_id()),
230 m_independent_instance_data(mode == instance_mode::independent
231 ? (new instance_data())
232 : nullptr),
233 m_buffer(mode == instance_mode::independent
234 ? m_independent_instance_data->m_buffer
235 : m_default_instance_data.m_buffer),
236 m_future(mode == instance_mode::independent
237 ? m_independent_instance_data->m_future
238 : m_default_instance_data.m_future),
239 m_future_mutex(mode == instance_mode::independent
240 ? m_independent_instance_data->m_future_mutex
241 : m_default_instance_data.m_future_mutex) {}
242
243 HPX(hpx::shared_future<void> future)
244 : m_instance_id(m_next_instance_id++),
245
246 m_independent_instance_data(new instance_data(future)),
247 m_buffer(m_independent_instance_data->m_buffer),
248 m_future(m_independent_instance_data->m_future),
249 m_future_mutex(m_independent_instance_data->m_future_mutex) {}
250
251 HPX(HPX &&other) = default;
252 HPX &operator=(HPX &&other) = default;
253 HPX(const HPX &other) = default;
254 HPX &operator=(const HPX &other) = default;
255#else
256 HPX() noexcept {}
257#endif
258
259 void print_configuration(std::ostream &os, bool /*verbose*/ = false) const {
260 os << "HPX backend\n";
261 os << "HPX Execution Space:\n";
262 os << " KOKKOS_ENABLE_HPX: yes\n";
263 os << "\nHPX Runtime Configuration:\n";
264 }
265 uint32_t impl_instance_id() const noexcept { return m_instance_id; }
266
267#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
268 static bool in_parallel(HPX const &instance = HPX()) noexcept {
269 return !instance.impl_get_future().is_ready();
270 }
271#else
272 static bool in_parallel(HPX const & = HPX()) noexcept { return false; }
273#endif
274
275#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
276 static void impl_decrement_active_parallel_region_count() {
277 std::unique_lock<hpx::spinlock> l(m_active_parallel_region_count_mutex);
278 if (--m_active_parallel_region_count == 0) {
279 l.unlock();
280 m_active_parallel_region_count_cond.notify_all();
281 };
282 }
283
284 static void impl_increment_active_parallel_region_count() {
285 std::unique_lock<hpx::spinlock> l(m_active_parallel_region_count_mutex);
286 ++m_active_parallel_region_count;
287 }
288#endif
289
290 void fence(
291 const std::string &name =
292 "Kokkos::Experimental::HPX::fence: Unnamed Instance Fence") const {
293 Kokkos::Tools::Experimental::Impl::profile_fence_event<
294 Kokkos::Experimental::HPX>(
295 name,
296 Kokkos::Tools::Experimental::Impl::DirectFenceIDHandle{
297 impl_instance_id()},
298 [&]() {
299#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
300 impl_get_future().wait();
301 // Reset the future to free variables that may have been captured in
302 // parallel regions.
303 impl_get_future() = hpx::make_ready_future<void>();
304#endif
305 });
306 }
307
308 static void impl_static_fence(const std::string &name) {
309 Kokkos::Tools::Experimental::Impl::profile_fence_event<
310 Kokkos::Experimental::HPX>(
311 name,
312 Kokkos::Tools::Experimental::SpecialSynchronizationCases::
313 GlobalDeviceSynchronization,
314 [&]() {
315#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
316 std::unique_lock<hpx::spinlock> l(
317 m_active_parallel_region_count_mutex);
318 m_active_parallel_region_count_cond.wait(
319 l, [&]() { return m_active_parallel_region_count == 0; });
320 // Reset the future to free variables that may have been captured in
321 // parallel regions (however, we don't have access to futures from
322 // instances other than the default instances, they will only be
323 // released by fence).
324 HPX().impl_get_future() = hpx::make_ready_future<void>();
325#endif
326 });
327 }
328
329 static hpx::execution::parallel_executor impl_get_executor() {
330 return hpx::execution::parallel_executor();
331 }
332
333 static bool is_asynchronous(HPX const & = HPX()) noexcept {
334#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
335 return true;
336#else
337 return false;
338#endif
339 }
340
341#ifdef KOKKOS_ENABLE_DEPRECATED_CODE_3
342 template <typename F>
343 KOKKOS_DEPRECATED static void partition_master(
344 F const &, int requested_num_partitions = 0, int = 0) {
345 if (requested_num_partitions > 1) {
347 "Kokkos::Experimental::HPX::partition_master: can't partition an "
348 "HPX instance\n");
349 }
350 }
351#endif
352
353 static int concurrency();
354 static void impl_initialize(InitializationSettings const &);
355 static bool impl_is_initialized() noexcept;
356 static void impl_finalize();
357
358 static int impl_thread_pool_size() noexcept {
359 hpx::runtime *rt = hpx::get_runtime_ptr();
360 if (rt == nullptr) {
361 return 0;
362 } else {
363 if (hpx::threads::get_self_ptr() == nullptr) {
364 return hpx::resource::get_thread_pool(0).get_os_thread_count();
365 } else {
366 return hpx::this_thread::get_pool()->get_os_thread_count();
367 }
368 }
369 }
370
371 static int impl_thread_pool_rank() noexcept {
372 hpx::runtime *rt = hpx::get_runtime_ptr();
373 if (rt == nullptr) {
374 return 0;
375 } else {
376 if (hpx::threads::get_self_ptr() == nullptr) {
377 return 0;
378 } else {
379 return hpx::this_thread::get_pool()->get_pool_index();
380 }
381 }
382 }
383
384 static int impl_thread_pool_size(int depth) {
385 if (depth == 0) {
386 return impl_thread_pool_size();
387 } else {
388 return 1;
389 }
390 }
391
392 static int impl_max_hardware_threads() noexcept {
393 return hpx::threads::hardware_concurrency();
394 }
395
396 static int impl_hardware_thread_id() noexcept {
397 return hpx::get_worker_thread_num();
398 }
399
400 Kokkos::Impl::thread_buffer &impl_get_buffer() const noexcept {
401#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
402 return m_buffer.get();
403#else
404 return m_default_buffer;
405#endif
406 }
407
408#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
409 hpx::shared_future<void> &impl_get_future() const noexcept {
410 return m_future;
411 }
412
413 hpx::spinlock &impl_get_future_mutex() const noexcept {
414 return m_future_mutex;
415 }
416#endif
417
418#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
419 struct [[nodiscard]] reset_on_exit_parallel {
420 HPX const &m_space;
421 reset_on_exit_parallel(HPX const &space) : m_space(space) {}
422 ~reset_on_exit_parallel() {
423 // See [note 1] for an explanation. m_independent_instance_data is
424 // marked mutable.
425 m_space.m_independent_instance_data.reset();
426
427 HPX::impl_decrement_active_parallel_region_count();
428 }
429 };
430
431 // This struct is identical to the above except it does not reset the shared
432 // data. It does, however, still decrement the parallel region count. It is
433 // meant for use in parallel regions which do not capture the execution space
434 // instance.
435 struct [[nodiscard]] reset_count_on_exit_parallel {
436 reset_count_on_exit_parallel() = default;
437 ~reset_count_on_exit_parallel() {
438 HPX::impl_decrement_active_parallel_region_count();
439 }
440 };
441#else
442 struct [[nodiscard]] reset_on_exit_parallel {
443 reset_on_exit_parallel(HPX const &) = default;
444 ~reset_on_exit_parallel() = default;
445 };
446
447 struct [[nodiscard]] reset_count_on_exit_parallel {
448 reset_count_on_exit_parallel() = default;
449 ~reset_count_on_exit_parallel() = default;
450 };
451#endif
452
453 static constexpr const char *name() noexcept { return "HPX"; }
454
455 private:
456 friend bool operator==(HPX const &lhs, HPX const &rhs) {
457 return lhs.m_instance_id == rhs.m_instance_id;
458 }
459 friend bool operator!=(HPX const &lhs, HPX const &rhs) {
460 return !(lhs == rhs);
461 }
462};
463} // namespace Experimental
464
465namespace Tools {
466namespace Experimental {
467template <>
468struct DeviceTypeTraits<Kokkos::Experimental::HPX> {
469 static constexpr DeviceType id = DeviceType::HPX;
470 static int device_id(const Kokkos::Experimental::HPX &) { return 0; }
471};
472} // namespace Experimental
473} // namespace Tools
474
475namespace Impl {
476
477#if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
478template <typename Closure>
479inline void dispatch_execute_task(Closure *closure,
480 Kokkos::Experimental::HPX const &instance,
481 bool force_synchronous = false) {
482 Kokkos::Experimental::HPX::impl_increment_active_parallel_region_count();
483
484 Closure closure_copy = *closure;
485
486 {
487 std::unique_lock<hpx::spinlock> l(instance.impl_get_future_mutex());
488 hpx::util::ignore_lock(&instance.impl_get_future_mutex());
489 hpx::shared_future<void> &fut = instance.impl_get_future();
490
491 fut = fut.then(hpx::execution::parallel_executor(
492 hpx::threads::thread_schedule_hint(0)),
493 [closure_copy](hpx::shared_future<void> &&) {
494 return closure_copy.execute_task();
495 });
496 }
497
498 if (force_synchronous) {
499 instance.fence(
500 "Kokkos::Experimental::Impl::HPX::dispatch_execute_task: fence due to "
501 "forced syncronizations");
502 }
503}
504#else
505template <typename Closure>
506inline void dispatch_execute_task(Closure *closure,
507 Kokkos::Experimental::HPX const &,
508 bool = false) {
509 closure->execute_task();
510}
511#endif
512} // namespace Impl
513} // namespace Kokkos
514
515namespace Kokkos {
516namespace Impl {
517template <>
518struct MemorySpaceAccess<Kokkos::Experimental::HPX::memory_space,
519 Kokkos::Experimental::HPX::scratch_memory_space> {
520 enum : bool { assignable = false };
521 enum : bool { accessible = true };
522 enum : bool { deepcopy = false };
523};
524
525} // namespace Impl
526} // namespace Kokkos
527
528namespace Kokkos {
529namespace Experimental {
530template <>
531class UniqueToken<HPX, UniqueTokenScope::Instance> {
532 private:
534 int m_count;
535 buffer_type m_buffer_view;
536 uint32_t volatile *m_buffer;
537
538 public:
539 using execution_space = HPX;
540 using size_type = int;
541
545 UniqueToken(execution_space const & = execution_space()) noexcept
546 : m_count(execution_space::impl_max_hardware_threads()),
547 m_buffer_view(buffer_type()),
548 m_buffer(nullptr) {}
549
550 UniqueToken(size_type max_size, execution_space const & = execution_space())
551 : m_count(max_size > execution_space::impl_max_hardware_threads()
552 ? execution_space::impl_max_hardware_threads()
553 : max_size),
554 m_buffer_view(
555 max_size > execution_space::impl_max_hardware_threads()
556 ? buffer_type()
557 : buffer_type("UniqueToken::m_buffer_view",
558 ::Kokkos::Impl::concurrent_bitset::buffer_bound(
559 m_count))),
560 m_buffer(m_buffer_view.data()) {}
561
563 KOKKOS_INLINE_FUNCTION
564 int size() const noexcept { return m_count; }
565
567 KOKKOS_INLINE_FUNCTION
568 int acquire() const noexcept {
569 KOKKOS_IF_ON_HOST((
570 if (m_buffer == nullptr) {
571 return execution_space::impl_hardware_thread_id();
572 } else {
573 const ::Kokkos::pair<int, int> result =
574 ::Kokkos::Impl::concurrent_bitset::acquire_bounded(
575 m_buffer, m_count, ::Kokkos::Impl::clock_tic() % m_count);
576
577 if (result.first < 0) {
579 "UniqueToken<HPX> failure to acquire tokens, no tokens "
580 "available");
581 }
582 return result.first;
583 }))
584
585 KOKKOS_IF_ON_DEVICE((return 0;))
586 }
587
589 KOKKOS_INLINE_FUNCTION
590 void release(int i) const noexcept {
591 KOKKOS_IF_ON_HOST((if (m_buffer != nullptr) {
592 ::Kokkos::Impl::concurrent_bitset::release(m_buffer, i);
593 }))
594
595 KOKKOS_IF_ON_DEVICE(((void)i;))
596 }
597};
598
599template <>
600class UniqueToken<HPX, UniqueTokenScope::Global> {
601 public:
602 using execution_space = HPX;
603 using size_type = int;
604 UniqueToken(execution_space const & = execution_space()) noexcept {}
605
606 // NOTE: Currently this assumes that there is no oversubscription.
607 // hpx::get_num_worker_threads can't be used directly because it may yield
608 // it's task (problematic if called after hpx::get_worker_thread_num).
609 int size() const noexcept { return HPX::impl_max_hardware_threads(); }
610 int acquire() const noexcept { return HPX::impl_hardware_thread_id(); }
611 void release(int) const noexcept {}
612};
613} // namespace Experimental
614} // namespace Kokkos
615
616namespace Kokkos {
617namespace Impl {
618
619struct HPXTeamMember {
620 public:
621 using execution_space = Kokkos::Experimental::HPX;
622 using scratch_memory_space =
624 using team_handle = HPXTeamMember;
625
626 private:
627 scratch_memory_space m_team_shared;
628
629 int m_league_size;
630 int m_league_rank;
631 int m_team_size;
632 int m_team_rank;
633
634 public:
635 KOKKOS_INLINE_FUNCTION
636 const scratch_memory_space &team_shmem() const {
637 return m_team_shared.set_team_thread_mode(0, 1, 0);
638 }
639
640 KOKKOS_INLINE_FUNCTION
641 const execution_space::scratch_memory_space &team_scratch(const int) const {
642 return m_team_shared.set_team_thread_mode(0, 1, 0);
643 }
644
645 KOKKOS_INLINE_FUNCTION
646 const execution_space::scratch_memory_space &thread_scratch(const int) const {
647 return m_team_shared.set_team_thread_mode(0, team_size(), team_rank());
648 }
649
650 KOKKOS_INLINE_FUNCTION int league_rank() const noexcept {
651 return m_league_rank;
652 }
653
654 KOKKOS_INLINE_FUNCTION int league_size() const noexcept {
655 return m_league_size;
656 }
657
658 KOKKOS_INLINE_FUNCTION int team_rank() const noexcept { return m_team_rank; }
659 KOKKOS_INLINE_FUNCTION int team_size() const noexcept { return m_team_size; }
660
661 template <class... Properties>
662 constexpr KOKKOS_INLINE_FUNCTION HPXTeamMember(
663 const TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>
664 &policy,
665 const int team_rank, const int league_rank, void *scratch,
666 size_t scratch_size) noexcept
667 : m_team_shared(scratch, scratch_size, scratch, scratch_size),
668 m_league_size(policy.league_size()),
669 m_league_rank(league_rank),
670 m_team_size(policy.team_size()),
671 m_team_rank(team_rank) {}
672
673 KOKKOS_INLINE_FUNCTION
674 void team_barrier() const {}
675
676 template <class ValueType>
677 KOKKOS_INLINE_FUNCTION void team_broadcast(ValueType &, const int &) const {}
678
679 template <class Closure, class ValueType>
680 KOKKOS_INLINE_FUNCTION void team_broadcast(const Closure &closure,
681 ValueType &value,
682 const int &) const {
683 closure(value);
684 }
685
686 template <class ValueType, class JoinOp>
687 KOKKOS_INLINE_FUNCTION ValueType team_reduce(const ValueType &value,
688 const JoinOp &) const {
689 return value;
690 }
691
692 template <class ReducerType>
693 KOKKOS_INLINE_FUNCTION std::enable_if_t<is_reducer<ReducerType>::value>
694 team_reduce(const ReducerType &) const {}
695
696 template <typename Type>
697 KOKKOS_INLINE_FUNCTION Type
698 team_scan(const Type &value, Type *const global_accum = nullptr) const {
699 if (global_accum) {
700 Kokkos::atomic_fetch_add(global_accum, value);
701 }
702
703 return 0;
704 }
705};
706
707template <class... Properties>
708class TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>
709 : public PolicyTraits<Properties...> {
710 int m_league_size;
711 int m_team_size;
712 std::size_t m_team_scratch_size[2];
713 std::size_t m_thread_scratch_size[2];
714 int m_chunk_size;
715
716 public:
717 using traits = PolicyTraits<Properties...>;
718
720 using execution_policy = TeamPolicyInternal;
721
722 using member_type = HPXTeamMember;
723
725 using execution_space = Kokkos::Experimental::HPX;
726
727 // NOTE: Max size is 1 for simplicity. In most cases more than 1 is not
728 // necessary on CPU. Implement later if there is a need.
729 template <class FunctorType>
730 inline static int team_size_max(const FunctorType &) {
731 return 1;
732 }
733
734 template <class FunctorType>
735 inline static int team_size_recommended(const FunctorType &) {
736 return 1;
737 }
738
739 template <class FunctorType>
740 inline static int team_size_recommended(const FunctorType &, const int &) {
741 return 1;
742 }
743
744 template <class FunctorType>
745 int team_size_max(const FunctorType &, const ParallelForTag &) const {
746 return 1;
747 }
748
749 template <class FunctorType>
750 int team_size_max(const FunctorType &, const ParallelReduceTag &) const {
751 return 1;
752 }
753
754 template <class FunctorType, class ReducerType>
755 int team_size_max(const FunctorType &, const ReducerType &,
756 const ParallelReduceTag &) const {
757 return 1;
758 }
759
760 template <class FunctorType>
761 int team_size_recommended(const FunctorType &, const ParallelForTag &) const {
762 return 1;
763 }
764
765 template <class FunctorType>
766 int team_size_recommended(const FunctorType &,
767 const ParallelReduceTag &) const {
768 return 1;
769 }
770
771 template <class FunctorType, class ReducerType>
772 int team_size_recommended(const FunctorType &, const ReducerType &,
773 const ParallelReduceTag &) const {
774 return 1;
775 }
776
777 static int vector_length_max() { return 1; }
778
779 inline int impl_vector_length() noexcept { return 1; }
780 inline bool impl_auto_team_size() noexcept { return false; }
781 inline bool impl_auto_vector_length() noexcept { return false; }
782 inline void impl_set_vector_length(int) noexcept {}
783 inline void impl_set_team_size(int) noexcept {}
784
785 private:
786 inline void init(const int league_size_request, const int team_size_request) {
787 m_league_size = league_size_request;
788 const int max_team_size = 1; // TODO: Can't use team_size_max(...) because
789 // it requires a functor as argument.
790 m_team_size =
791 team_size_request > max_team_size ? max_team_size : team_size_request;
792
793 if (m_chunk_size > 0) {
794 if (!Impl::is_integral_power_of_two(m_chunk_size))
795 Kokkos::abort("TeamPolicy blocking granularity must be power of two");
796 } else {
797 int new_chunk_size = 1;
798 while (new_chunk_size * 4 * Kokkos::Experimental::HPX::concurrency() <
799 m_league_size) {
800 new_chunk_size *= 2;
801 }
802
803 if (new_chunk_size < 128) {
804 new_chunk_size = 1;
805 while ((new_chunk_size * Kokkos::Experimental::HPX::concurrency() <
806 m_league_size) &&
807 (new_chunk_size < 128))
808 new_chunk_size *= 2;
809 }
810
811 m_chunk_size = new_chunk_size;
812 }
813 }
814
815 public:
816 inline int team_size() const { return m_team_size; }
817 inline int league_size() const { return m_league_size; }
818
819 size_t scratch_size(const int &level, int team_size_ = -1) const {
820 if (team_size_ < 0) {
821 team_size_ = m_team_size;
822 }
823 return m_team_scratch_size[level] +
824 team_size_ * m_thread_scratch_size[level];
825 }
826
827 inline static int scratch_size_max(int level) {
828 return (level == 0 ? 1024 * 32 : // Roughly L1 size
829 20 * 1024 * 1024); // Limit to keep compatibility with CUDA
830 }
831
832 public:
833 template <class ExecSpace, class... OtherProperties>
834 friend class TeamPolicyInternal;
835
836 const typename traits::execution_space &space() const {
837 static typename traits::execution_space m_space;
838 return m_space;
839 }
840
841 template <class... OtherProperties>
842 TeamPolicyInternal(const TeamPolicyInternal<Kokkos::Experimental::HPX,
843 OtherProperties...> &p) {
844 m_league_size = p.m_league_size;
845 m_team_size = p.m_team_size;
846 m_team_scratch_size[0] = p.m_team_scratch_size[0];
847 m_thread_scratch_size[0] = p.m_thread_scratch_size[0];
848 m_team_scratch_size[1] = p.m_team_scratch_size[1];
849 m_thread_scratch_size[1] = p.m_thread_scratch_size[1];
850 m_chunk_size = p.m_chunk_size;
851 }
852
853 TeamPolicyInternal(const typename traits::execution_space &,
854 int league_size_request, int team_size_request,
855 int /* vector_length_request */ = 1)
856 : m_team_scratch_size{0, 0},
857 m_thread_scratch_size{0, 0},
858 m_chunk_size(0) {
859 init(league_size_request, team_size_request);
860 }
861
862 TeamPolicyInternal(const typename traits::execution_space &,
863 int league_size_request, const Kokkos::AUTO_t &,
864 int /* vector_length_request */ = 1)
865 : m_team_scratch_size{0, 0},
866 m_thread_scratch_size{0, 0},
867 m_chunk_size(0) {
868 init(league_size_request, 1);
869 }
870
871 TeamPolicyInternal(const typename traits::execution_space &,
872 int league_size_request,
873 const Kokkos::AUTO_t &, /* team_size_request */
874 const Kokkos::AUTO_t & /* vector_length_request */)
875 : m_team_scratch_size{0, 0},
876 m_thread_scratch_size{0, 0},
877 m_chunk_size(0) {
878 init(league_size_request, 1);
879 }
880
881 TeamPolicyInternal(const typename traits::execution_space &,
882 int league_size_request, int team_size_request,
883 const Kokkos::AUTO_t & /* vector_length_request */
884 )
885 : m_team_scratch_size{0, 0},
886 m_thread_scratch_size{0, 0},
887 m_chunk_size(0) {
888 init(league_size_request, team_size_request);
889 }
890
891 TeamPolicyInternal(int league_size_request,
892 const Kokkos::AUTO_t &, /* team_size_request */
893 const Kokkos::AUTO_t & /* vector_length_request */)
894 : m_team_scratch_size{0, 0},
895 m_thread_scratch_size{0, 0},
896 m_chunk_size(0) {
897 init(league_size_request, 1);
898 }
899
900 TeamPolicyInternal(int league_size_request, int team_size_request,
901 const Kokkos::AUTO_t & /* vector_length_request */
902 )
903 : m_team_scratch_size{0, 0},
904 m_thread_scratch_size{0, 0},
905 m_chunk_size(0) {
906 init(league_size_request, team_size_request);
907 }
908
909 TeamPolicyInternal(int league_size_request, int team_size_request,
910 int /* vector_length_request */ = 1)
911 : m_team_scratch_size{0, 0},
912 m_thread_scratch_size{0, 0},
913 m_chunk_size(0) {
914 init(league_size_request, team_size_request);
915 }
916
917 TeamPolicyInternal(int league_size_request, const Kokkos::AUTO_t &,
918 int /* vector_length_request */ = 1)
919 : m_team_scratch_size{0, 0},
920 m_thread_scratch_size{0, 0},
921 m_chunk_size(0) {
922 init(league_size_request, 1);
923 }
924
925 inline int chunk_size() const { return m_chunk_size; }
926
927 inline TeamPolicyInternal &set_chunk_size(
928 typename traits::index_type chunk_size_) {
929 m_chunk_size = chunk_size_;
930 return *this;
931 }
932
933 inline TeamPolicyInternal &set_scratch_size(const int &level,
934 const PerTeamValue &per_team) {
935 m_team_scratch_size[level] = per_team.value;
936 return *this;
937 }
938
939 inline TeamPolicyInternal &set_scratch_size(
940 const int &level, const PerThreadValue &per_thread) {
941 m_thread_scratch_size[level] = per_thread.value;
942 return *this;
943 }
944
945 inline TeamPolicyInternal &set_scratch_size(
946 const int &level, const PerTeamValue &per_team,
947 const PerThreadValue &per_thread) {
948 m_team_scratch_size[level] = per_team.value;
949 m_thread_scratch_size[level] = per_thread.value;
950 return *this;
951 }
952};
953} // namespace Impl
954} // namespace Kokkos
955
956namespace Kokkos {
957namespace Impl {
958
959template <typename Policy>
960typename Policy::member_type get_hpx_adjusted_chunk_size(Policy const &policy) {
961 const int concurrency = Kokkos::Experimental::HPX::concurrency();
962 const typename Policy::member_type n = policy.end() - policy.begin();
963 typename Policy::member_type new_chunk_size = policy.chunk_size();
964
965 while (n >= 4 * concurrency * new_chunk_size) {
966 new_chunk_size *= 2;
967 }
968
969 return new_chunk_size;
970}
971
972template <class FunctorType, class... Traits>
973class ParallelFor<FunctorType, Kokkos::RangePolicy<Traits...>,
974 Kokkos::Experimental::HPX> {
975 private:
976 using Policy = Kokkos::RangePolicy<Traits...>;
977 using WorkTag = typename Policy::work_tag;
978 using WorkRange = typename Policy::WorkRange;
979 using Member = typename Policy::member_type;
980
981 const FunctorType m_functor;
982 const Policy m_policy;
983
984 template <class TagType>
985 static std::enable_if_t<std::is_void<TagType>::value> execute_functor(
986 const FunctorType &functor, const Member i) {
987 functor(i);
988 }
989
990 template <class TagType>
991 static std::enable_if_t<!std::is_void<TagType>::value> execute_functor(
992 const FunctorType &functor, const Member i) {
993 const TagType t{};
994 functor(t, i);
995 }
996
997 template <class TagType>
998 static std::enable_if_t<std::is_void<TagType>::value> execute_functor_range(
999 const FunctorType &functor, const Member i_begin, const Member i_end) {
1000 for (Member i = i_begin; i < i_end; ++i) {
1001 functor(i);
1002 }
1003 }
1004
1005 template <class TagType>
1006 static std::enable_if_t<!std::is_void<TagType>::value> execute_functor_range(
1007 const FunctorType &functor, const Member i_begin, const Member i_end) {
1008 const TagType t{};
1009 for (Member i = i_begin; i < i_end; ++i) {
1010 functor(t, i);
1011 }
1012 }
1013
1014 public:
1015 void execute() const {
1016 Kokkos::Impl::dispatch_execute_task(this, m_policy.space());
1017 }
1018
1019 void execute_task() const {
1020 // See [note 1] for an explanation.
1021 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1022 m_policy.space());
1023
1024 auto exec = Kokkos::Experimental::HPX::impl_get_executor();
1025
1026 using hpx::execution::par;
1027 using hpx::execution::static_chunk_size;
1028
1029#if KOKKOS_HPX_IMPLEMENTATION == 0
1030 using hpx::for_loop;
1031
1032 for_loop(par.on(exec).with(static_chunk_size(m_policy.chunk_size())),
1033 m_policy.begin(), m_policy.end(), [this](const Member i) {
1034 execute_functor<WorkTag>(m_functor, i);
1035 });
1036
1037#elif KOKKOS_HPX_IMPLEMENTATION == 1
1038 using hpx::for_loop_strided;
1039
1040 const Member chunk_size = get_hpx_adjusted_chunk_size(m_policy);
1041
1042 for_loop_strided(
1043 par.on(exec), m_policy.begin(), m_policy.end(), chunk_size,
1044 [this, chunk_size](const Member i_begin) {
1045 const Member i_end = (std::min)(i_begin + chunk_size, m_policy.end());
1046 execute_functor_range<WorkTag>(m_functor, i_begin, i_end);
1047 });
1048#endif
1049 }
1050
1051 inline ParallelFor(const FunctorType &arg_functor, Policy arg_policy)
1052 : m_functor(arg_functor), m_policy(arg_policy) {}
1053};
1054
1055template <class FunctorType, class... Traits>
1056class ParallelFor<FunctorType, Kokkos::MDRangePolicy<Traits...>,
1057 Kokkos::Experimental::HPX> {
1058 private:
1059 using MDRangePolicy = Kokkos::MDRangePolicy<Traits...>;
1060 using Policy = typename MDRangePolicy::impl_range_policy;
1061 using WorkTag = typename MDRangePolicy::work_tag;
1062 using WorkRange = typename Policy::WorkRange;
1063 using Member = typename Policy::member_type;
1064 using iterate_type =
1065 typename Kokkos::Impl::HostIterateTile<MDRangePolicy, FunctorType,
1066 WorkTag, void>;
1067
1068 const iterate_type m_iter;
1069 const Policy m_policy;
1070
1071 public:
1072 void execute() const { dispatch_execute_task(this, m_iter.m_rp.space()); }
1073
1074 inline void execute_task() const {
1075 // See [note 1] for an explanation.
1076 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1077 m_iter.m_rp.space());
1078
1079 auto exec = Kokkos::Experimental::HPX::impl_get_executor();
1080
1081 using hpx::execution::par;
1082 using hpx::execution::static_chunk_size;
1083
1084#if KOKKOS_HPX_IMPLEMENTATION == 0
1085 using hpx::for_loop;
1086
1087 for_loop(par.on(exec).with(
1088 static_chunk_size(get_hpx_adjusted_chunk_size(m_policy))),
1089 m_policy.begin(), m_policy.end(),
1090 [this](const Member i) { iterate_type(i); });
1091
1092#elif KOKKOS_HPX_IMPLEMENTATION == 1
1093 using hpx::for_loop_strided;
1094
1095 const Member chunk_size = get_hpx_adjusted_chunk_size(m_policy);
1096
1097 for_loop_strided(par.on(exec), m_policy.begin(), m_policy.end(), chunk_size,
1098 [this, chunk_size](const Member i_begin) {
1099 const Member i_end =
1100 (std::min)(i_begin + chunk_size, m_policy.end());
1101 for (Member i = i_begin; i < i_end; ++i) {
1102 m_iter(i);
1103 }
1104 });
1105#endif
1106 }
1107
1108 inline ParallelFor(const FunctorType &arg_functor, MDRangePolicy arg_policy)
1109 : m_iter(arg_policy, arg_functor),
1110 m_policy(Policy(0, arg_policy.m_num_tiles).set_chunk_size(1)) {}
1111 template <typename Policy, typename Functor>
1112 static int max_tile_size_product(const Policy &, const Functor &) {
1118 return 1024;
1119 }
1120};
1121} // namespace Impl
1122} // namespace Kokkos
1123
1124namespace Kokkos {
1125namespace Impl {
1126template <class FunctorType, class ReducerType, class... Traits>
1127class ParallelReduce<FunctorType, Kokkos::RangePolicy<Traits...>, ReducerType,
1128 Kokkos::Experimental::HPX> {
1129 private:
1130 using Policy = Kokkos::RangePolicy<Traits...>;
1131 using WorkTag = typename Policy::work_tag;
1132 using WorkRange = typename Policy::WorkRange;
1133 using Member = typename Policy::member_type;
1134 using ReducerConditional =
1135 Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1136 FunctorType, ReducerType>;
1137 using ReducerTypeFwd = typename ReducerConditional::type;
1138 using Analysis =
1139 FunctorAnalysis<FunctorPatternInterface::REDUCE, Policy, ReducerTypeFwd>;
1140 using value_type = typename Analysis::value_type;
1141 using pointer_type = typename Analysis::pointer_type;
1142 using reference_type = typename Analysis::reference_type;
1143
1144 const FunctorType m_functor;
1145 const Policy m_policy;
1146 const ReducerType m_reducer;
1147 const pointer_type m_result_ptr;
1148
1149 bool m_force_synchronous;
1150
1151 template <class TagType>
1152 inline static std::enable_if_t<std::is_void<TagType>::value> execute_functor(
1153 const FunctorType &functor, const Member i, reference_type update) {
1154 functor(i, update);
1155 }
1156
1157 template <class TagType>
1158 inline static std::enable_if_t<!std::is_void<TagType>::value> execute_functor(
1159 const FunctorType &functor, const Member i, reference_type update) {
1160 const TagType t{};
1161 functor(t, i, update);
1162 }
1163
1164 template <class TagType>
1165 inline std::enable_if_t<std::is_void<TagType>::value> execute_functor_range(
1166 reference_type update, const Member i_begin, const Member i_end) const {
1167 for (Member i = i_begin; i < i_end; ++i) {
1168 m_functor(i, update);
1169 }
1170 }
1171
1172 template <class TagType>
1173 inline std::enable_if_t<!std::is_void<TagType>::value> execute_functor_range(
1174 reference_type update, const Member i_begin, const Member i_end) const {
1175 const TagType t{};
1176
1177 for (Member i = i_begin; i < i_end; ++i) {
1178 m_functor(t, i, update);
1179 }
1180 }
1181
1182 class value_type_wrapper {
1183 private:
1184 std::size_t m_value_size;
1185 char *m_value_buffer;
1186
1187 public:
1188 value_type_wrapper() : m_value_size(0), m_value_buffer(nullptr) {}
1189
1190 value_type_wrapper(const std::size_t value_size)
1191 : m_value_size(value_size), m_value_buffer(new char[m_value_size]) {}
1192
1193 value_type_wrapper(const value_type_wrapper &other)
1194 : m_value_size(0), m_value_buffer(nullptr) {
1195 if (this != &other) {
1196 m_value_buffer = new char[other.m_value_size];
1197 m_value_size = other.m_value_size;
1198
1199 std::copy(other.m_value_buffer, other.m_value_buffer + m_value_size,
1200 m_value_buffer);
1201 }
1202 }
1203
1204 ~value_type_wrapper() { delete[] m_value_buffer; }
1205
1206 value_type_wrapper(value_type_wrapper &&other)
1207 : m_value_size(0), m_value_buffer(nullptr) {
1208 if (this != &other) {
1209 m_value_buffer = other.m_value_buffer;
1210 m_value_size = other.m_value_size;
1211
1212 other.m_value_buffer = nullptr;
1213 other.m_value_size = 0;
1214 }
1215 }
1216
1217 value_type_wrapper &operator=(const value_type_wrapper &other) {
1218 if (this != &other) {
1219 delete[] m_value_buffer;
1220 m_value_buffer = new char[other.m_value_size];
1221 m_value_size = other.m_value_size;
1222
1223 std::copy(other.m_value_buffer, other.m_value_buffer + m_value_size,
1224 m_value_buffer);
1225 }
1226
1227 return *this;
1228 }
1229
1230 value_type_wrapper &operator=(value_type_wrapper &&other) {
1231 if (this != &other) {
1232 delete[] m_value_buffer;
1233 m_value_buffer = other.m_value_buffer;
1234 m_value_size = other.m_value_size;
1235
1236 other.m_value_buffer = nullptr;
1237 other.m_value_size = 0;
1238 }
1239
1240 return *this;
1241 }
1242
1243 pointer_type pointer() const {
1244 return reinterpret_cast<pointer_type>(m_value_buffer);
1245 }
1246
1247 reference_type reference() const {
1248 return Analysis::Reducer::reference(
1249 reinterpret_cast<pointer_type>(m_value_buffer));
1250 }
1251 };
1252
1253 public:
1254 void execute() const {
1255 if (m_policy.end() <= m_policy.begin()) {
1256 if (m_result_ptr) {
1257 typename Analysis::Reducer final_reducer(
1258 &ReducerConditional::select(m_functor, m_reducer));
1259
1260 final_reducer.init(m_result_ptr);
1261 final_reducer.final(m_result_ptr);
1262 }
1263 return;
1264 }
1265 dispatch_execute_task(this, m_policy.space(), m_force_synchronous);
1266 }
1267
1268 inline void execute_task() const {
1269 // See [note 1] for an explanation.
1270 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1271 m_policy.space());
1272
1273 typename Analysis::Reducer final_reducer(
1274 &ReducerConditional::select(m_functor, m_reducer));
1275
1276 const std::size_t value_size =
1277 Analysis::value_size(ReducerConditional::select(m_functor, m_reducer));
1278
1279 auto exec = Kokkos::Experimental::HPX::impl_get_executor();
1280
1281 using hpx::for_loop;
1282 using hpx::execution::par;
1283 using hpx::execution::static_chunk_size;
1284
1285#if KOKKOS_HPX_IMPLEMENTATION == 0
1286 // NOTE: This version makes the most use of HPX functionality, but
1287 // requires the struct value_type_wrapper to handle different
1288 // reference_types. It is also significantly slower than the version
1289 // below due to not reusing the buffer used by other functions.
1290 using hpx::parallel::reduction;
1291
1292 value_type_wrapper final_value(value_size);
1293 value_type_wrapper identity(value_size);
1294
1295 final_reducer.init(final_value.pointer());
1296 final_reducer.init(identity.pointer());
1297
1298 for_loop(par.on(exec).with(
1299 static_chunk_size(get_hpx_adjusted_chunk_size(m_policy))),
1300 m_policy.begin(), m_policy.end(),
1301 reduction(final_value, identity,
1302 [final_reducer](
1303 value_type_wrapper &a,
1304 value_type_wrapper &b) -> value_type_wrapper & {
1305 final_reducer.join(a.pointer(), b.pointer());
1306 return a;
1307 }),
1308 [this](Member i, value_type_wrapper &update) {
1309 execute_functor<WorkTag>(m_functor, i, update.reference());
1310 });
1311
1312 pointer_type final_value_ptr = final_value.pointer();
1313
1314#elif KOKKOS_HPX_IMPLEMENTATION == 1
1315 using hpx::for_loop_strided;
1316
1317 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1318
1319 thread_buffer &buffer = m_policy.space().impl_get_buffer();
1320 buffer.resize(num_worker_threads, value_size);
1321
1322 for_loop(
1323 par.on(exec).with(static_chunk_size(1)), 0, num_worker_threads,
1324 [&buffer, final_reducer ](const int t) noexcept {
1325 final_reducer.init(reinterpret_cast<pointer_type>(buffer.get(t)));
1326 });
1327
1328 const Member chunk_size = get_hpx_adjusted_chunk_size(m_policy);
1329
1330 for_loop_strided(
1331 par.on(exec), m_policy.begin(), m_policy.end(), chunk_size,
1332 [this, &buffer, chunk_size](const Member i_begin) {
1333 reference_type update = Analysis::Reducer::reference(
1334 reinterpret_cast<pointer_type>(buffer.get(
1335 Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1336 const Member i_end = (std::min)(i_begin + chunk_size, m_policy.end());
1337 execute_functor_range<WorkTag>(update, i_begin, i_end);
1338 });
1339
1340 for (int i = 1; i < num_worker_threads; ++i) {
1341 final_reducer.join(reinterpret_cast<pointer_type>(buffer.get(0)),
1342 reinterpret_cast<pointer_type>(buffer.get(i)));
1343 }
1344
1345 pointer_type final_value_ptr =
1346 reinterpret_cast<pointer_type>(buffer.get(0));
1347#endif
1348
1349 final_reducer.final(final_value_ptr);
1350
1351 if (m_result_ptr != nullptr) {
1352 const int n = Analysis::value_count(
1353 ReducerConditional::select(m_functor, m_reducer));
1354
1355 for (int j = 0; j < n; ++j) {
1356 m_result_ptr[j] = final_value_ptr[j];
1357 }
1358 }
1359 }
1360
1361 template <class ViewType>
1362 inline ParallelReduce(
1363 const FunctorType &arg_functor, Policy arg_policy,
1364 const ViewType &arg_view,
1365 std::enable_if_t<Kokkos::is_view<ViewType>::value &&
1367 void *> = nullptr)
1368 : m_functor(arg_functor),
1369 m_policy(arg_policy),
1370 m_reducer(InvalidType()),
1371 m_result_ptr(arg_view.data()),
1372 m_force_synchronous(!arg_view.impl_track().has_record()) {}
1373
1374 inline ParallelReduce(const FunctorType &arg_functor, Policy arg_policy,
1375 const ReducerType &reducer)
1376 : m_functor(arg_functor),
1377 m_policy(arg_policy),
1378 m_reducer(reducer),
1379 m_result_ptr(reducer.view().data()),
1380 m_force_synchronous(!reducer.view().impl_track().has_record()) {}
1381};
1382
1383template <class FunctorType, class ReducerType, class... Traits>
1384class ParallelReduce<FunctorType, Kokkos::MDRangePolicy<Traits...>, ReducerType,
1385 Kokkos::Experimental::HPX> {
1386 private:
1387 using MDRangePolicy = Kokkos::MDRangePolicy<Traits...>;
1388 using Policy = typename MDRangePolicy::impl_range_policy;
1389 using WorkTag = typename MDRangePolicy::work_tag;
1390 using WorkRange = typename Policy::WorkRange;
1391 using Member = typename Policy::member_type;
1392 using ReducerConditional =
1393 Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1394 FunctorType, ReducerType>;
1395 using ReducerTypeFwd = typename ReducerConditional::type;
1396 using Analysis = FunctorAnalysis<FunctorPatternInterface::REDUCE,
1397 MDRangePolicy, ReducerTypeFwd>;
1398
1399 using pointer_type = typename Analysis::pointer_type;
1400 using value_type = typename Analysis::value_type;
1401 using reference_type = typename Analysis::reference_type;
1402 using iterate_type =
1403 typename Kokkos::Impl::HostIterateTile<MDRangePolicy, FunctorType,
1404 WorkTag, reference_type>;
1405
1406 const iterate_type m_iter;
1407 const Policy m_policy;
1408 const ReducerType m_reducer;
1409 const pointer_type m_result_ptr;
1410
1411 bool m_force_synchronous;
1412
1413 public:
1414 void execute() const {
1415 dispatch_execute_task(this, m_iter.m_rp.space(), m_force_synchronous);
1416 }
1417
1418 inline void execute_task() const {
1419 // See [note 1] for an explanation.
1420 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1421 m_iter.m_rp.space());
1422
1423 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1424 const std::size_t value_size = Analysis::value_size(
1425 ReducerConditional::select(m_iter.m_func, m_reducer));
1426
1427 thread_buffer &buffer = m_iter.m_rp.space().impl_get_buffer();
1428 buffer.resize(num_worker_threads, value_size);
1429
1430 using hpx::for_loop;
1431 using hpx::execution::par;
1432 using hpx::execution::static_chunk_size;
1433
1434 auto exec = Kokkos::Experimental::HPX::impl_get_executor();
1435
1436 typename Analysis::Reducer final_reducer(
1437 &ReducerConditional::select(m_iter.m_func, m_reducer));
1438
1439#if KOKKOS_HPX_IMPLEMENTATION == 0
1440
1441 for_loop(
1442 par.on(exec).with(static_chunk_size(1)), 0, num_worker_threads,
1443 [&buffer, final_reducer](std::size_t t) {
1444 final_reducer.init(reinterpret_cast<pointer_type>(buffer.get(t)));
1445 });
1446
1447 for_loop(par.on(exec).with(
1448 static_chunk_size(get_hpx_adjusted_chunk_size(m_policy))),
1449 m_policy.begin(), m_policy.end(), [this, &buffer](const Member i) {
1450 reference_type update = Analysis::Reducer::reference(
1451 reinterpret_cast<pointer_type>(buffer.get(
1452 Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1453 m_iter(i, update);
1454 });
1455
1456#elif KOKKOS_HPX_IMPLEMENTATION == 1
1457 using hpx::for_loop_strided;
1458
1459 for_loop(
1460 par.on(exec).with(static_chunk_size(1)), std::size_t(0),
1461 num_worker_threads, [&buffer, final_reducer](const std::size_t t) {
1462 final_reducer.init(reinterpret_cast<pointer_type>(buffer.get(t)));
1463 });
1464
1465 const Member chunk_size = get_hpx_adjusted_chunk_size(m_policy);
1466
1467 for_loop_strided(
1468 par.on(exec), m_policy.begin(), m_policy.end(), chunk_size,
1469 [this, &buffer, chunk_size](const Member i_begin) {
1470 reference_type update = Analysis::Reducer::reference(
1471 reinterpret_cast<pointer_type>(buffer.get(
1472 Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1473 const Member i_end = (std::min)(i_begin + chunk_size, m_policy.end());
1474
1475 for (Member i = i_begin; i < i_end; ++i) {
1476 m_iter(i, update);
1477 }
1478 });
1479#endif
1480
1481 for (int i = 1; i < num_worker_threads; ++i) {
1482 final_reducer.join(reinterpret_cast<pointer_type>(buffer.get(0)),
1483 reinterpret_cast<pointer_type>(buffer.get(i)));
1484 }
1485
1486 final_reducer.final(reinterpret_cast<pointer_type>(buffer.get(0)));
1487
1488 if (m_result_ptr != nullptr) {
1489 const int n = Analysis::value_count(
1490 ReducerConditional::select(m_iter.m_func, m_reducer));
1491
1492 for (int j = 0; j < n; ++j) {
1493 m_result_ptr[j] = reinterpret_cast<pointer_type>(buffer.get(0))[j];
1494 }
1495 }
1496 }
1497
1498 template <class ViewType>
1499 inline ParallelReduce(
1500 const FunctorType &arg_functor, MDRangePolicy arg_policy,
1501 const ViewType &arg_view,
1502 std::enable_if_t<Kokkos::is_view<ViewType>::value &&
1504 void *> = nullptr)
1505 : m_iter(arg_policy, arg_functor),
1506 m_policy(Policy(0, arg_policy.m_num_tiles).set_chunk_size(1)),
1507 m_reducer(InvalidType()),
1508 m_result_ptr(arg_view.data()),
1509 m_force_synchronous(!arg_view.impl_track().has_record()) {}
1510
1511 inline ParallelReduce(const FunctorType &arg_functor,
1512 MDRangePolicy arg_policy, const ReducerType &reducer)
1513 : m_iter(arg_policy, arg_functor),
1514 m_policy(Policy(0, arg_policy.m_num_tiles).set_chunk_size(1)),
1515 m_reducer(reducer),
1516 m_result_ptr(reducer.view().data()),
1517 m_force_synchronous(!reducer.view().impl_track().has_record()) {}
1518 template <typename Policy, typename Functor>
1519 static int max_tile_size_product(const Policy &, const Functor &) {
1525 return 1024;
1526 }
1527};
1528} // namespace Impl
1529} // namespace Kokkos
1530
1531namespace Kokkos {
1532namespace Impl {
1533
1534template <class FunctorType, class... Traits>
1535class ParallelScan<FunctorType, Kokkos::RangePolicy<Traits...>,
1536 Kokkos::Experimental::HPX> {
1537 private:
1538 using Policy = Kokkos::RangePolicy<Traits...>;
1539 using WorkTag = typename Policy::work_tag;
1540 using WorkRange = typename Policy::WorkRange;
1541 using Member = typename Policy::member_type;
1542 using Analysis =
1543 FunctorAnalysis<FunctorPatternInterface::SCAN, Policy, FunctorType>;
1544 using pointer_type = typename Analysis::pointer_type;
1545 using reference_type = typename Analysis::reference_type;
1546 using value_type = typename Analysis::value_type;
1547
1548 const FunctorType m_functor;
1549 const Policy m_policy;
1550
1551 template <class TagType>
1552 inline static std::enable_if_t<std::is_void<TagType>::value>
1553 execute_functor_range(const FunctorType &functor, const Member i_begin,
1554 const Member i_end, reference_type update,
1555 const bool final) {
1556 for (Member i = i_begin; i < i_end; ++i) {
1557 functor(i, update, final);
1558 }
1559 }
1560
1561 template <class TagType>
1562 inline static std::enable_if_t<!std::is_void<TagType>::value>
1563 execute_functor_range(const FunctorType &functor, const Member i_begin,
1564 const Member i_end, reference_type update,
1565 const bool final) {
1566 const TagType t{};
1567 for (Member i = i_begin; i < i_end; ++i) {
1568 functor(t, i, update, final);
1569 }
1570 }
1571
1572 public:
1573 void execute() const { dispatch_execute_task(this, m_policy.space()); }
1574
1575 inline void execute_task() const {
1576 // See [note 1] for an explanation.
1577 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1578 m_policy.space());
1579
1580 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1581 const int value_count = Analysis::value_count(m_functor);
1582 const std::size_t value_size = Analysis::value_size(m_functor);
1583
1584 thread_buffer &buffer = m_policy.space().impl_get_buffer();
1585 buffer.resize(num_worker_threads, 2 * value_size);
1586
1587 using hpx::barrier;
1588 using hpx::for_loop;
1589 using hpx::execution::par;
1590 using hpx::execution::static_chunk_size;
1591
1592 barrier<> bar(num_worker_threads);
1593 auto exec = Kokkos::Experimental::HPX::impl_get_executor();
1594
1595 typename Analysis::Reducer final_reducer(&m_functor);
1596
1597 for_loop(
1598 par.on(exec).with(static_chunk_size(1)), 0, num_worker_threads,
1599 [this, &bar, &buffer, num_worker_threads, value_count, value_size,
1600 final_reducer](int t) {
1601 reference_type update_sum =
1602 final_reducer.init(reinterpret_cast<pointer_type>(buffer.get(t)));
1603
1604 const WorkRange range(m_policy, t, num_worker_threads);
1605 execute_functor_range<WorkTag>(m_functor, range.begin(), range.end(),
1606 update_sum, false);
1607
1608 bar.arrive_and_wait();
1609
1610 if (t == 0) {
1611 final_reducer.init(
1612 reinterpret_cast<pointer_type>(buffer.get(0) + value_size));
1613
1614 for (int i = 1; i < num_worker_threads; ++i) {
1615 pointer_type ptr_1_prev =
1616 reinterpret_cast<pointer_type>(buffer.get(i - 1));
1617 pointer_type ptr_2_prev = reinterpret_cast<pointer_type>(
1618 buffer.get(i - 1) + value_size);
1619 pointer_type ptr_2 =
1620 reinterpret_cast<pointer_type>(buffer.get(i) + value_size);
1621
1622 for (int j = 0; j < value_count; ++j) {
1623 ptr_2[j] = ptr_2_prev[j];
1624 }
1625
1626 final_reducer.join(ptr_2, ptr_1_prev);
1627 }
1628 }
1629
1630 bar.arrive_and_wait();
1631
1632 reference_type update_base = Analysis::Reducer::reference(
1633 reinterpret_cast<pointer_type>(buffer.get(t) + value_size));
1634
1635 execute_functor_range<WorkTag>(m_functor, range.begin(), range.end(),
1636 update_base, true);
1637 });
1638 }
1639
1640 inline ParallelScan(const FunctorType &arg_functor, const Policy &arg_policy)
1641 : m_functor(arg_functor), m_policy(arg_policy) {}
1642};
1643
1644template <class FunctorType, class ReturnType, class... Traits>
1645class ParallelScanWithTotal<FunctorType, Kokkos::RangePolicy<Traits...>,
1646 ReturnType, Kokkos::Experimental::HPX> {
1647 private:
1648 using Policy = Kokkos::RangePolicy<Traits...>;
1649 using WorkTag = typename Policy::work_tag;
1650 using WorkRange = typename Policy::WorkRange;
1651 using Member = typename Policy::member_type;
1652 using Analysis =
1653 FunctorAnalysis<FunctorPatternInterface::SCAN, Policy, FunctorType>;
1654 using pointer_type = typename Analysis::pointer_type;
1655 using reference_type = typename Analysis::reference_type;
1656 using value_type = typename Analysis::value_type;
1657
1658 const FunctorType m_functor;
1659 const Policy m_policy;
1660 pointer_type m_result_ptr;
1661
1662 template <class TagType>
1663 inline static std::enable_if_t<std::is_void<TagType>::value>
1664 execute_functor_range(const FunctorType &functor, const Member i_begin,
1665 const Member i_end, reference_type update,
1666 const bool final) {
1667 for (Member i = i_begin; i < i_end; ++i) {
1668 functor(i, update, final);
1669 }
1670 }
1671
1672 template <class TagType>
1673 inline static std::enable_if_t<!std::is_void<TagType>::value>
1674 execute_functor_range(const FunctorType &functor, const Member i_begin,
1675 const Member i_end, reference_type update,
1676 const bool final) {
1677 const TagType t{};
1678 for (Member i = i_begin; i < i_end; ++i) {
1679 functor(t, i, update, final);
1680 }
1681 }
1682
1683 public:
1684 void execute() const { dispatch_execute_task(this, m_policy.space()); }
1685
1686 inline void execute_task() const {
1687 // See [note 1] for an explanation.
1688 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1689 m_policy.space());
1690
1691 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1692 const int value_count = Analysis::value_count(m_functor);
1693 const std::size_t value_size = Analysis::value_size(m_functor);
1694
1695 thread_buffer &buffer = m_policy.space().impl_get_buffer();
1696 buffer.resize(num_worker_threads, 2 * value_size);
1697
1698 using hpx::barrier;
1699 using hpx::for_loop;
1700 using hpx::execution::par;
1701 using hpx::execution::static_chunk_size;
1702
1703 barrier<> bar(num_worker_threads);
1704 auto exec = Kokkos::Experimental::HPX::impl_get_executor();
1705
1706 typename Analysis::Reducer final_reducer(&m_functor);
1707
1708 for_loop(
1709 par.on(exec).with(static_chunk_size(1)), 0, num_worker_threads,
1710 [this, &bar, &buffer, num_worker_threads, value_count, value_size,
1711 final_reducer](int t) {
1712 reference_type update_sum =
1713 final_reducer.init(reinterpret_cast<pointer_type>(buffer.get(t)));
1714
1715 const WorkRange range(m_policy, t, num_worker_threads);
1716 execute_functor_range<WorkTag>(m_functor, range.begin(), range.end(),
1717 update_sum, false);
1718
1719 bar.arrive_and_wait();
1720
1721 if (t == 0) {
1722 final_reducer.init(
1723 reinterpret_cast<pointer_type>(buffer.get(0) + value_size));
1724
1725 for (int i = 1; i < num_worker_threads; ++i) {
1726 pointer_type ptr_1_prev =
1727 reinterpret_cast<pointer_type>(buffer.get(i - 1));
1728 pointer_type ptr_2_prev = reinterpret_cast<pointer_type>(
1729 buffer.get(i - 1) + value_size);
1730 pointer_type ptr_2 =
1731 reinterpret_cast<pointer_type>(buffer.get(i) + value_size);
1732
1733 for (int j = 0; j < value_count; ++j) {
1734 ptr_2[j] = ptr_2_prev[j];
1735 }
1736
1737 final_reducer.join(ptr_2, ptr_1_prev);
1738 }
1739 }
1740
1741 bar.arrive_and_wait();
1742
1743 reference_type update_base = Analysis::Reducer::reference(
1744 reinterpret_cast<pointer_type>(buffer.get(t) + value_size));
1745
1746 execute_functor_range<WorkTag>(m_functor, range.begin(), range.end(),
1747 update_base, true);
1748
1749 if (t == num_worker_threads - 1) {
1750 *m_result_ptr = update_base;
1751 }
1752 });
1753 }
1754
1755 template <class ViewType>
1756 ParallelScanWithTotal(const FunctorType &arg_functor,
1757 const Policy &arg_policy,
1758 const ViewType &arg_result_view)
1759 : m_functor(arg_functor),
1760 m_policy(arg_policy),
1761 m_result_ptr(arg_result_view.data()) {
1762 static_assert(
1763 Kokkos::Impl::MemorySpaceAccess<typename ViewType::memory_space,
1764 Kokkos::HostSpace>::accessible,
1765 "Kokkos::HPX parallel_scan result must be host-accessible!");
1766 }
1767};
1768} // namespace Impl
1769} // namespace Kokkos
1770
1771namespace Kokkos {
1772namespace Impl {
1773template <class FunctorType, class... Properties>
1774class ParallelFor<FunctorType, Kokkos::TeamPolicy<Properties...>,
1775 Kokkos::Experimental::HPX> {
1776 private:
1777 using Policy = TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>;
1778 using WorkTag = typename Policy::work_tag;
1779 using Member = typename Policy::member_type;
1780 using memory_space = Kokkos::HostSpace;
1781
1782 const FunctorType m_functor;
1783 const Policy m_policy;
1784 const int m_league;
1785 const std::size_t m_shared;
1786
1787 template <class TagType>
1788 inline static std::enable_if_t<std::is_void<TagType>::value> execute_functor(
1789 const FunctorType &functor, const Policy &policy, const int league_rank,
1790 char *local_buffer, const std::size_t local_buffer_size) {
1791 functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size));
1792 }
1793
1794 template <class TagType>
1795 inline static std::enable_if_t<!std::is_void<TagType>::value> execute_functor(
1796 const FunctorType &functor, const Policy &policy, const int league_rank,
1797 char *local_buffer, const std::size_t local_buffer_size) {
1798 const TagType t{};
1799 functor(t, Member(policy, 0, league_rank, local_buffer, local_buffer_size));
1800 }
1801
1802 template <class TagType>
1803 inline static std::enable_if_t<std::is_void<TagType>::value>
1804 execute_functor_range(const FunctorType &functor, const Policy &policy,
1805 const int league_rank_begin, const int league_rank_end,
1806 char *local_buffer,
1807 const std::size_t local_buffer_size) {
1808 for (int league_rank = league_rank_begin; league_rank < league_rank_end;
1809 ++league_rank) {
1810 functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size));
1811 }
1812 }
1813
1814 template <class TagType>
1815 inline static std::enable_if_t<!std::is_void<TagType>::value>
1816 execute_functor_range(const FunctorType &functor, const Policy &policy,
1817 const int league_rank_begin, const int league_rank_end,
1818 char *local_buffer,
1819 const std::size_t local_buffer_size) {
1820 const TagType t{};
1821 for (int league_rank = league_rank_begin; league_rank < league_rank_end;
1822 ++league_rank) {
1823 functor(t,
1824 Member(policy, 0, league_rank, local_buffer, local_buffer_size));
1825 }
1826 }
1827
1828 public:
1829 void execute() const { dispatch_execute_task(this, m_policy.space()); }
1830
1831 inline void execute_task() const {
1832 // See [note 1] for an explanation.
1833 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1834 m_policy.space());
1835
1836 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1837
1838 thread_buffer &buffer = m_policy.space().impl_get_buffer();
1839 buffer.resize(num_worker_threads, m_shared);
1840
1841 auto exec = Kokkos::Experimental::HPX::impl_get_executor();
1842
1843 using hpx::execution::par;
1844 using hpx::execution::static_chunk_size;
1845
1846#if KOKKOS_HPX_IMPLEMENTATION == 0
1847 using hpx::for_loop;
1848
1849 for_loop(
1850 par.on(exec).with(static_chunk_size(m_policy.chunk_size())), 0,
1851 m_policy.league_size(), [this, &buffer](const int league_rank) {
1852 execute_functor<WorkTag>(
1853 m_functor, m_policy, league_rank,
1854 buffer.get(Kokkos::Experimental::HPX::impl_hardware_thread_id()),
1855 m_shared);
1856 });
1857
1858#elif KOKKOS_HPX_IMPLEMENTATION == 1
1859 using hpx::for_loop_strided;
1860
1861 for_loop_strided(
1862 par.on(exec), 0, m_policy.league_size(), m_policy.chunk_size(),
1863 [this, &buffer](const int league_rank_begin) {
1864 const int league_rank_end =
1865 (std::min)(league_rank_begin + m_policy.chunk_size(),
1866 m_policy.league_size());
1867 execute_functor_range<WorkTag>(
1868 m_functor, m_policy, league_rank_begin, league_rank_end,
1869 buffer.get(Kokkos::Experimental::HPX::impl_hardware_thread_id()),
1870 m_shared);
1871 });
1872#endif
1873 }
1874
1875 ParallelFor(const FunctorType &arg_functor, const Policy &arg_policy)
1876 : m_functor(arg_functor),
1877 m_policy(arg_policy),
1878 m_league(arg_policy.league_size()),
1879 m_shared(arg_policy.scratch_size(0) + arg_policy.scratch_size(1) +
1880 FunctorTeamShmemSize<FunctorType>::value(
1881 arg_functor, arg_policy.team_size())) {}
1882};
1883
1884template <class FunctorType, class ReducerType, class... Properties>
1885class ParallelReduce<FunctorType, Kokkos::TeamPolicy<Properties...>,
1886 ReducerType, Kokkos::Experimental::HPX> {
1887 private:
1888 using Policy = TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>;
1889 using Member = typename Policy::member_type;
1890 using WorkTag = typename Policy::work_tag;
1891 using ReducerConditional =
1892 Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1893 FunctorType, ReducerType>;
1894 using ReducerTypeFwd = typename ReducerConditional::type;
1895 using Analysis =
1896 FunctorAnalysis<FunctorPatternInterface::REDUCE, Policy, ReducerTypeFwd>;
1897 using pointer_type = typename Analysis::pointer_type;
1898 using reference_type = typename Analysis::reference_type;
1899 using value_type = typename Analysis::value_type;
1900
1901 const FunctorType m_functor;
1902 const int m_league;
1903 const Policy m_policy;
1904 const ReducerType m_reducer;
1905 pointer_type m_result_ptr;
1906 const std::size_t m_shared;
1907
1908 bool m_force_synchronous;
1909
1910 template <class TagType>
1911 inline static std::enable_if_t<std::is_void<TagType>::value> execute_functor(
1912 const FunctorType &functor, const Policy &policy, const int league_rank,
1913 char *local_buffer, const std::size_t local_buffer_size,
1914 reference_type update) {
1915 functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size),
1916 update);
1917 }
1918
1919 template <class TagType>
1920 inline static std::enable_if_t<!std::is_void<TagType>::value> execute_functor(
1921 const FunctorType &functor, const Policy &policy, const int league_rank,
1922 char *local_buffer, const std::size_t local_buffer_size,
1923 reference_type update) {
1924 const TagType t{};
1925 functor(t, Member(policy, 0, league_rank, local_buffer, local_buffer_size),
1926 update);
1927 }
1928
1929 template <class TagType>
1930 inline static std::enable_if_t<std::is_void<TagType>::value>
1931 execute_functor_range(const FunctorType &functor, const Policy &policy,
1932 const int league_rank_begin, const int league_rank_end,
1933 char *local_buffer, const std::size_t local_buffer_size,
1934 reference_type update) {
1935 for (int league_rank = league_rank_begin; league_rank < league_rank_end;
1936 ++league_rank) {
1937 functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size),
1938 update);
1939 }
1940 }
1941
1942 template <class TagType>
1943 inline static std::enable_if_t<!std::is_void<TagType>::value>
1944 execute_functor_range(const FunctorType &functor, const Policy &policy,
1945 const int league_rank_begin, const int league_rank_end,
1946 char *local_buffer, const std::size_t local_buffer_size,
1947 reference_type update) {
1948 const TagType t{};
1949 for (int league_rank = league_rank_begin; league_rank < league_rank_end;
1950 ++league_rank) {
1951 functor(t,
1952 Member(policy, 0, league_rank, local_buffer, local_buffer_size),
1953 update);
1954 }
1955 }
1956
1957 public:
1958 void execute() const {
1959 if (m_policy.league_size() * m_policy.team_size() == 0) {
1960 if (m_result_ptr) {
1961 typename Analysis::Reducer final_reducer(
1962 &ReducerConditional::select(m_functor, m_reducer));
1963 final_reducer.init(m_result_ptr);
1964 final_reducer.final(m_result_ptr);
1965 }
1966 return;
1967 }
1968 dispatch_execute_task(this, m_policy.space());
1969 }
1970
1971 inline void execute_task() const {
1972 // See [note 1] for an explanation.
1973 Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1974 m_policy.space());
1975
1976 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1977 const std::size_t value_size =
1978 Analysis::value_size(ReducerConditional::select(m_functor, m_reducer));
1979
1980 thread_buffer &buffer = m_policy.space().impl_get_buffer();
1981 buffer.resize(num_worker_threads, value_size + m_shared);
1982
1983 auto exec = Kokkos::Experimental::HPX::impl_get_executor();
1984
1985 using hpx::for_loop;
1986 using hpx::execution::par;
1987 using hpx::execution::static_chunk_size;
1988
1989 typename Analysis::Reducer final_reducer(
1990 &ReducerConditional::select(m_functor, m_reducer));
1991
1992#if KOKKOS_HPX_IMPLEMENTATION == 0
1993
1994 for_loop(
1995 par.on(exec).with(static_chunk_size(1)), 0, num_worker_threads,
1996 [&buffer, final_reducer](const std::size_t t) {
1997 final_reducer.init(reinterpret_cast<pointer_type>(buffer.get(t)));
1998 });
1999
2000 for_loop(par.on(exec).with(static_chunk_size(m_policy.chunk_size())), 0,
2001 m_policy.league_size(),
2002 [this, &buffer, value_size](const int league_rank) {
2003 std::size_t t =
2004 Kokkos::Experimental::HPX::impl_hardware_thread_id();
2005 reference_type update = Analysis::Reducer::reference(
2006 reinterpret_cast<pointer_type>(buffer.get(t)));
2007
2008 execute_functor<WorkTag>(m_functor, m_policy, league_rank,
2009 buffer.get(t) + value_size, m_shared,
2010 update);
2011 });
2012
2013#elif KOKKOS_HPX_IMPLEMENTATION == 1
2014 using hpx::for_loop_strided;
2015
2016 for_loop(
2017 par.on(exec).with(static_chunk_size(1)), 0, num_worker_threads,
2018 [&buffer, final_reducer](std::size_t const t) {
2019 final_reducer.init(reinterpret_cast<pointer_type>(buffer.get(t)));
2020 });
2021
2022 for_loop_strided(
2023 par.on(exec), 0, m_policy.league_size(), m_policy.chunk_size(),
2024 [this, &buffer, value_size](int const league_rank_begin) {
2025 std::size_t t = Kokkos::Experimental::HPX::impl_hardware_thread_id();
2026 reference_type update = Analysis::Reducer::reference(
2027 reinterpret_cast<pointer_type>(buffer.get(t)));
2028 const int league_rank_end =
2029 (std::min)(league_rank_begin + m_policy.chunk_size(),
2030 m_policy.league_size());
2031 execute_functor_range<WorkTag>(
2032 m_functor, m_policy, league_rank_begin, league_rank_end,
2033 buffer.get(t) + value_size, m_shared, update);
2034 });
2035#endif
2036
2037 const pointer_type ptr = reinterpret_cast<pointer_type>(buffer.get(0));
2038 for (int t = 1; t < num_worker_threads; ++t) {
2039 final_reducer.join(ptr, reinterpret_cast<pointer_type>(buffer.get(t)));
2040 }
2041
2042 final_reducer.final(ptr);
2043
2044 if (m_result_ptr) {
2045 const int n = Analysis::value_count(
2046 ReducerConditional::select(m_functor, m_reducer));
2047
2048 for (int j = 0; j < n; ++j) {
2049 m_result_ptr[j] = ptr[j];
2050 }
2051 }
2052 }
2053
2054 template <class ViewType>
2055 ParallelReduce(const FunctorType &arg_functor, const Policy &arg_policy,
2056 const ViewType &arg_result,
2057 std::enable_if_t<Kokkos::is_view<ViewType>::value &&
2059 void *> = nullptr)
2060 : m_functor(arg_functor),
2061 m_league(arg_policy.league_size()),
2062 m_policy(arg_policy),
2063 m_reducer(InvalidType()),
2064 m_result_ptr(arg_result.data()),
2065 m_shared(arg_policy.scratch_size(0) + arg_policy.scratch_size(1) +
2066 FunctorTeamShmemSize<FunctorType>::value(
2067 m_functor, arg_policy.team_size())),
2068 m_force_synchronous(!arg_result.impl_track().has_record()) {}
2069
2070 inline ParallelReduce(const FunctorType &arg_functor, Policy arg_policy,
2071 const ReducerType &reducer)
2072 : m_functor(arg_functor),
2073 m_league(arg_policy.league_size()),
2074 m_policy(arg_policy),
2075 m_reducer(reducer),
2076 m_result_ptr(reducer.view().data()),
2077 m_shared(arg_policy.scratch_size(0) + arg_policy.scratch_size(1) +
2078 FunctorTeamShmemSize<FunctorType>::value(
2079 arg_functor, arg_policy.team_size())),
2080 m_force_synchronous(!reducer.view().impl_track().has_record()) {}
2081};
2082} // namespace Impl
2083} // namespace Kokkos
2084
2085namespace Kokkos {
2086
2087template <typename iType>
2088KOKKOS_INLINE_FUNCTION
2089 Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2090 TeamThreadRange(const Impl::HPXTeamMember &thread, const iType &count) {
2091 return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2092 thread, count);
2093}
2094
2095template <typename iType1, typename iType2>
2096KOKKOS_INLINE_FUNCTION Impl::TeamThreadRangeBoundariesStruct<
2097 std::common_type_t<iType1, iType2>, Impl::HPXTeamMember>
2098TeamThreadRange(const Impl::HPXTeamMember &thread, const iType1 &i_begin,
2099 const iType2 &i_end) {
2100 using iType = std::common_type_t<iType1, iType2>;
2101 return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2102 thread, iType(i_begin), iType(i_end));
2103}
2104
2105template <typename iType>
2106KOKKOS_INLINE_FUNCTION
2107 Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2108 TeamVectorRange(const Impl::HPXTeamMember &thread, const iType &count) {
2109 return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2110 thread, count);
2111}
2112
2113template <typename iType1, typename iType2>
2114KOKKOS_INLINE_FUNCTION Impl::TeamThreadRangeBoundariesStruct<
2115 std::common_type_t<iType1, iType2>, Impl::HPXTeamMember>
2116TeamVectorRange(const Impl::HPXTeamMember &thread, const iType1 &i_begin,
2117 const iType2 &i_end) {
2118 using iType = std::common_type_t<iType1, iType2>;
2119 return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2120 thread, iType(i_begin), iType(i_end));
2121}
2122
2123template <typename iType>
2124KOKKOS_INLINE_FUNCTION
2125 Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2126 ThreadVectorRange(const Impl::HPXTeamMember &thread, const iType &count) {
2127 return Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2128 thread, count);
2129}
2130
2131template <typename iType1, typename iType2>
2132KOKKOS_INLINE_FUNCTION Impl::ThreadVectorRangeBoundariesStruct<
2133 std::common_type_t<iType1, iType2>, Impl::HPXTeamMember>
2134ThreadVectorRange(const Impl::HPXTeamMember &thread, const iType1 &i_begin,
2135 const iType2 &i_end) {
2136 using iType = std::common_type_t<iType1, iType2>;
2137 return Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2138 thread, iType(i_begin), iType(i_end));
2139}
2140
2141KOKKOS_INLINE_FUNCTION
2142Impl::ThreadSingleStruct<Impl::HPXTeamMember> PerTeam(
2143 const Impl::HPXTeamMember &thread) {
2144 return Impl::ThreadSingleStruct<Impl::HPXTeamMember>(thread);
2145}
2146
2147KOKKOS_INLINE_FUNCTION
2148Impl::VectorSingleStruct<Impl::HPXTeamMember> PerThread(
2149 const Impl::HPXTeamMember &thread) {
2150 return Impl::VectorSingleStruct<Impl::HPXTeamMember>(thread);
2151}
2152
2158template <typename iType, class Lambda>
2159KOKKOS_INLINE_FUNCTION void parallel_for(
2160 const Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2161 &loop_boundaries,
2162 const Lambda &lambda) {
2163 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2164 i += loop_boundaries.increment)
2165 lambda(i);
2166}
2167
2174template <typename iType, class Lambda, typename ValueType>
2175KOKKOS_INLINE_FUNCTION void parallel_reduce(
2176 const Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2177 &loop_boundaries,
2178 const Lambda &lambda, ValueType &result) {
2179 result = ValueType();
2180 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2181 i += loop_boundaries.increment) {
2182 lambda(i, result);
2183 }
2184}
2185
2191template <typename iType, class Lambda>
2192KOKKOS_INLINE_FUNCTION void parallel_for(
2193 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2194 &loop_boundaries,
2195 const Lambda &lambda) {
2196#ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
2197#pragma ivdep
2198#endif
2199 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2200 i += loop_boundaries.increment) {
2201 lambda(i);
2202 }
2203}
2204
2211template <typename iType, class Lambda, typename ValueType>
2212KOKKOS_INLINE_FUNCTION void parallel_reduce(
2213 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2214 &loop_boundaries,
2215 const Lambda &lambda, ValueType &result) {
2216 result = ValueType();
2217#ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
2218#pragma ivdep
2219#endif
2220 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2221 i += loop_boundaries.increment) {
2222 lambda(i, result);
2223 }
2224}
2225
2226template <typename iType, class Lambda, typename ReducerType>
2227KOKKOS_INLINE_FUNCTION void parallel_reduce(
2228 const Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2229 &loop_boundaries,
2230 const Lambda &lambda, const ReducerType &reducer) {
2231 reducer.init(reducer.reference());
2232 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2233 i += loop_boundaries.increment) {
2234 lambda(i, reducer.reference());
2235 }
2236}
2237
2238template <typename iType, class Lambda, typename ReducerType>
2239KOKKOS_INLINE_FUNCTION void parallel_reduce(
2240 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2241 &loop_boundaries,
2242 const Lambda &lambda, const ReducerType &reducer) {
2243 reducer.init(reducer.reference());
2244#ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
2245#pragma ivdep
2246#endif
2247 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2248 i += loop_boundaries.increment) {
2249 lambda(i, reducer.reference());
2250 }
2251}
2252
2253template <typename iType, class FunctorType>
2254KOKKOS_INLINE_FUNCTION void parallel_scan(
2255 Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember> const
2256 &loop_boundaries,
2257 const FunctorType &lambda) {
2258 using value_type = typename Kokkos::Impl::FunctorAnalysis<
2259 Kokkos::Impl::FunctorPatternInterface::SCAN, void,
2260 FunctorType>::value_type;
2261
2262 value_type scan_val = value_type();
2263
2264 // Intra-member scan
2265 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2266 i += loop_boundaries.increment) {
2267 lambda(i, scan_val, false);
2268 }
2269
2270 // 'scan_val' output is the exclusive prefix sum
2271 scan_val = loop_boundaries.thread.team_scan(scan_val);
2272
2273 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2274 i += loop_boundaries.increment) {
2275 lambda(i, scan_val, true);
2276 }
2277}
2278
2290template <typename iType, class FunctorType>
2291KOKKOS_INLINE_FUNCTION void parallel_scan(
2292 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2293 &loop_boundaries,
2294 const FunctorType &lambda) {
2295 using value_type =
2296 typename Impl::FunctorAnalysis<Impl::FunctorPatternInterface::SCAN,
2297 TeamPolicy<Experimental::HPX>,
2298 FunctorType>::value_type;
2299
2300 value_type scan_val = value_type();
2301
2302#ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
2303#pragma ivdep
2304#endif
2305 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2306 i += loop_boundaries.increment) {
2307 lambda(i, scan_val, true);
2308 }
2309}
2310
2314template <typename iType, class FunctorType, typename ReducerType>
2315KOKKOS_INLINE_FUNCTION std::enable_if_t<Kokkos::is_reducer<ReducerType>::value>
2316parallel_scan(
2317 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2318 &loop_boundaries,
2319 const FunctorType &lambda, const ReducerType &reducer) {
2320 typename ReducerType::value_type scan_val;
2321 reducer.init(scan_val);
2322
2323#ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
2324#pragma ivdep
2325#endif
2326 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2327 i += loop_boundaries.increment) {
2328 lambda(i, scan_val, true);
2329 }
2330}
2331
2332template <class FunctorType>
2333KOKKOS_INLINE_FUNCTION void single(
2334 const Impl::VectorSingleStruct<Impl::HPXTeamMember> &,
2335 const FunctorType &lambda) {
2336 lambda();
2337}
2338
2339template <class FunctorType>
2340KOKKOS_INLINE_FUNCTION void single(
2341 const Impl::ThreadSingleStruct<Impl::HPXTeamMember> &,
2342 const FunctorType &lambda) {
2343 lambda();
2344}
2345
2346template <class FunctorType, class ValueType>
2347KOKKOS_INLINE_FUNCTION void single(
2348 const Impl::VectorSingleStruct<Impl::HPXTeamMember> &,
2349 const FunctorType &lambda, ValueType &val) {
2350 lambda(val);
2351}
2352
2353template <class FunctorType, class ValueType>
2354KOKKOS_INLINE_FUNCTION void single(
2355 const Impl::ThreadSingleStruct<Impl::HPXTeamMember> &,
2356 const FunctorType &lambda, ValueType &val) {
2357 lambda(val);
2358}
2359
2360} // namespace Kokkos
2361
2362#include <HPX/Kokkos_HPX_Task.hpp>
2363
2364#endif /* #if defined( KOKKOS_ENABLE_HPX ) */
2365#endif /* #ifndef KOKKOS_HPX_HPP */
Declaration of various MemoryLayout options.
Declaration of parallel operators.
A thread safe view to a bitset.
KOKKOS_INLINE_FUNCTION size_type acquire() const
acquire value such that 0 <= value < size()
KOKKOS_INLINE_FUNCTION size_type size() const
upper bound for acquired values, i.e. 0 <= value < size()
UniqueToken(execution_space const &=execution_space())
create object size for concurrency on the given instance
KOKKOS_INLINE_FUNCTION void release(size_type) const
release a value acquired by generate
Memory management for host memory.
Execution policy for work over a range of an integral type.
ReturnType
Access relationship between DstMemorySpace and SrcMemorySpace.