You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

340 lines
10 KiB

commit b505a8d719c208073959eff07f4af202ef49a8a1
Author: Willy Tarreau <w@1wt.eu>
Date: Thu Aug 2 10:16:17 2018 +0200
MEDIUM: hathreads: implement a more flexible rendez-vous point
The current synchronization point enforces certain restrictions which
are hard to workaround in certain areas of the code. The fact that the
critical code can only be called from the sync point itself is a problem
for some callback-driven parts. The "show fd" command for example is
fragile regarding this.
Also it is expensive in terms of CPU usage because it wakes every other
thread just to be sure all of them join to the rendez-vous point. It's a
problem because the sleeping threads would not need to be woken up just
to know they're doing nothing.
Here we implement a different approach. We keep track of harmless threads,
which are defined as those either doing nothing, or doing harmless things.
The rendez-vous is used "for others" as a way for a thread to isolate itself.
A thread then requests to be alone using thread_isolate() when approaching
the dangerous area, and then waits until all other threads are either doing
the same or are doing something harmless (typically polling). The function
only returns once the thread is guaranteed to be alone, and the critical
section is terminated using thread_release().
(cherry picked from commit 60b639ccbe919b86790267d7e45a39b75434acbe)
[wt: this will be needed to fix the "show fd" command with threads]
Signed-off-by: Willy Tarreau <w@1wt.eu>
diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 7eb5d127..f8fda87a 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -117,6 +117,27 @@ static inline void __ha_barrier_full(void)
{
}
+static inline void thread_harmless_now()
+{
+}
+
+static inline void thread_harmless_end()
+{
+}
+
+static inline void thread_isolate()
+{
+}
+
+static inline void thread_release()
+{
+}
+
+static inline unsigned long thread_isolated()
+{
+ return 1;
+}
+
#else /* USE_THREAD */
#include <stdio.h>
@@ -229,10 +250,34 @@ void thread_enter_sync(void);
void thread_exit_sync(void);
int thread_no_sync(void);
int thread_need_sync(void);
+void thread_harmless_till_end();
+void thread_isolate();
+void thread_release();
extern THREAD_LOCAL unsigned int tid; /* The thread id */
extern THREAD_LOCAL unsigned long tid_bit; /* The bit corresponding to the thread id */
extern volatile unsigned long all_threads_mask;
+extern volatile unsigned long threads_want_rdv_mask;
+extern volatile unsigned long threads_harmless_mask;
+
+/* explanation for threads_want_rdv_mask and threads_harmless_mask :
+ * - threads_want_rdv_mask is a bit field indicating all threads that have
+ * requested a rendez-vous of other threads using thread_isolate().
+ * - threads_harmless_mask is a bit field indicating all threads that are
+ * currently harmless in that they promise not to access a shared resource.
+ *
+ * For a given thread, its bits in want_rdv and harmless can be translated like
+ * this :
+ *
+ * ----------+----------+----------------------------------------------------
+ * want_rdv | harmless | description
+ * ----------+----------+----------------------------------------------------
+ * 0 | 0 | thread not interested in RDV, possibly harmful
+ * 0 | 1 | thread not interested in RDV but harmless
+ * 1 | 1 | thread interested in RDV and waiting for its turn
+ * 1 | 0 | thread currently working isolated from others
+ * ----------+----------+----------------------------------------------------
+ */
#define ha_sigmask(how, set, oldset) pthread_sigmask(how, set, oldset)
@@ -243,6 +288,38 @@ static inline void ha_set_tid(unsigned int data)
tid_bit = (1UL << tid);
}
+/* Marks the thread as harmless. Note: this must be true, i.e. the thread must
+ * not be touching any unprotected shared resource during this period. Usually
+ * this is called before poll(), but it may also be placed around very slow
+ * calls (eg: some crypto operations). Needs to be terminated using
+ * thread_harmless_end().
+ */
+static inline void thread_harmless_now()
+{
+ HA_ATOMIC_OR(&threads_harmless_mask, tid_bit);
+}
+
+/* Ends the harmless period started by thread_harmless_now(). Usually this is
+ * placed after the poll() call. If it is discovered that a job was running and
+ * is relying on the thread still being harmless, the thread waits for the
+ * other one to finish.
+ */
+static inline void thread_harmless_end()
+{
+ while (1) {
+ HA_ATOMIC_AND(&threads_harmless_mask, ~tid_bit);
+ if (likely((threads_want_rdv_mask & all_threads_mask) == 0))
+ break;
+ thread_harmless_till_end();
+ }
+}
+
+/* an isolated thread has harmless cleared and want_rdv set */
+static inline unsigned long thread_isolated()
+{
+ return threads_want_rdv_mask & ~threads_harmless_mask & tid_bit;
+}
+
#if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index adc15acd..09d1abb6 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -17,6 +17,7 @@
#include <common/config.h>
#include <common/debug.h>
#include <common/epoll.h>
+#include <common/hathreads.h>
#include <common/standard.h>
#include <common/ticks.h>
#include <common/time.h>
@@ -153,6 +154,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
}
HA_SPIN_UNLOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+ thread_harmless_now();
+
/* compute the epoll_wait() timeout */
if (!exp)
wait_time = MAX_DELAY_MS;
@@ -173,6 +176,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
tv_update_date(wait_time, status);
measure_idle();
+ thread_harmless_end();
+
/* process polled events */
for (count = 0; count < status; count++) {
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index 642de8b3..1f4762e6 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -19,6 +19,7 @@
#include <common/compat.h>
#include <common/config.h>
+#include <common/hathreads.h>
#include <common/ticks.h>
#include <common/time.h>
#include <common/tools.h>
@@ -127,6 +128,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
}
HA_SPIN_UNLOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+ thread_harmless_now();
+
if (changes) {
#ifdef EV_RECEIPT
kev[0].flags |= EV_RECEIPT;
@@ -169,6 +172,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
tv_update_date(delta_ms, status);
measure_idle();
+ thread_harmless_end();
+
for (count = 0; count < status; count++) {
unsigned int n = 0;
fd = kev[count].ident;
diff --git a/src/ev_poll.c b/src/ev_poll.c
index c913ced2..7da992d6 100644
--- a/src/ev_poll.c
+++ b/src/ev_poll.c
@@ -19,6 +19,7 @@
#include <common/compat.h>
#include <common/config.h>
+#include <common/hathreads.h>
#include <common/ticks.h>
#include <common/time.h>
@@ -149,6 +150,9 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
}
HA_SPIN_UNLOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+
+ thread_harmless_now();
+
fd_nbupdt = 0;
nbfd = 0;
@@ -200,6 +204,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
tv_update_date(wait_time, status);
measure_idle();
+ thread_harmless_end();
+
for (count = 0; status > 0 && count < nbfd; count++) {
unsigned int n;
int e = poll_events[count].revents;
diff --git a/src/ev_select.c b/src/ev_select.c
index bde923ea..9daf74d9 100644
--- a/src/ev_select.c
+++ b/src/ev_select.c
@@ -16,6 +16,7 @@
#include <common/compat.h>
#include <common/config.h>
+#include <common/hathreads.h>
#include <common/ticks.h>
#include <common/time.h>
@@ -123,6 +124,9 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
}
HA_SPIN_UNLOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+
+ thread_harmless_now();
+
fd_nbupdt = 0;
/* let's restore fdset state */
@@ -171,6 +175,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
tv_update_date(delta_ms, status);
measure_idle();
+ thread_harmless_end();
+
if (status <= 0)
return;
diff --git a/src/hathreads.c b/src/hathreads.c
index ba05fe27..97ed31c5 100644
--- a/src/hathreads.c
+++ b/src/hathreads.c
@@ -30,6 +30,8 @@ void thread_sync_io_handler(int fd)
static HA_SPINLOCK_T sync_lock;
static int threads_sync_pipe[2];
static unsigned long threads_want_sync = 0;
+volatile unsigned long threads_want_rdv_mask = 0;
+volatile unsigned long threads_harmless_mask = 0;
volatile unsigned long all_threads_mask = 1; // nbthread 1 assumed by default
THREAD_LOCAL unsigned int tid = 0;
THREAD_LOCAL unsigned long tid_bit = (1UL << 0);
@@ -163,6 +165,68 @@ void thread_exit_sync()
thread_sync_barrier(&barrier);
}
+/* Marks the thread as harmless until the last thread using the rendez-vous
+ * point quits. Given that we can wait for a long time, sched_yield() is used
+ * when available to offer the CPU resources to competing threads if needed.
+ */
+void thread_harmless_till_end()
+{
+ HA_ATOMIC_OR(&threads_harmless_mask, tid_bit);
+ while (threads_want_rdv_mask & all_threads_mask) {
+#if _POSIX_PRIORITY_SCHEDULING
+ sched_yield();
+#else
+ pl_cpu_relax();
+#endif
+ }
+}
+
+/* Isolates the current thread : request the ability to work while all other
+ * threads are harmless. Only returns once all of them are harmless, with the
+ * current thread's bit in threads_harmless_mask cleared. Needs to be completed
+ * using thread_release().
+ */
+void thread_isolate()
+{
+ unsigned long old;
+
+ HA_ATOMIC_OR(&threads_harmless_mask, tid_bit);
+ __ha_barrier_store();
+ HA_ATOMIC_OR(&threads_want_rdv_mask, tid_bit);
+
+ /* wait for all threads to become harmless */
+ old = threads_harmless_mask;
+ while (1) {
+ if (unlikely((old & all_threads_mask) != all_threads_mask))
+ old = threads_harmless_mask;
+ else if (HA_ATOMIC_CAS(&threads_harmless_mask, &old, old & ~tid_bit))
+ break;
+
+#if _POSIX_PRIORITY_SCHEDULING
+ sched_yield();
+#else
+ pl_cpu_relax();
+#endif
+ }
+ /* one thread gets released at a time here, with its harmess bit off.
+ * The loss of this bit makes the other one continue to spin while the
+ * thread is working alone.
+ */
+}
+
+/* Cancels the effect of thread_isolate() by releasing the current thread's bit
+ * in threads_want_rdv_mask and by marking this thread as harmless until the
+ * last worker finishes.
+ */
+void thread_release()
+{
+ while (1) {
+ HA_ATOMIC_AND(&threads_want_rdv_mask, ~tid_bit);
+ if (!(threads_want_rdv_mask & all_threads_mask))
+ break;
+ thread_harmless_till_end();
+ }
+}
__attribute__((constructor))
static void __hathreads_init(void)