RCU lock implementation

Introduce an RCU lock implementation as an alternative locking mechanism
to openssl.  The api is documented in the ossl_rcu.pod
file

Read side implementaiton is comparable to that of RWLOCKS:
ossl_rcu_read_lock(lock);
<
critical section in which data can be accessed via
ossl_derefrence
>
ossl_rcu_read_unlock(lock);

Write side implementation is:
ossl_rcu_write_lock(lock);
<
critical section in which data can be updated via
ossl_assign_pointer
and stale data can optionally be scheduled for removal
via ossl_rcu_call
>
ossl_rcu_write_unlock(lock);
...
ossl_synchronize_rcu(lock);

ossl_rcu_call fixup

Reviewed-by: Hugo Landau <hlandau@openssl.org>
Reviewed-by: Matt Caswell <matt@openssl.org>
(Merged from https://github.com/openssl/openssl/pull/22729)
This commit is contained in:
Neil Horman 2024-01-12 10:39:56 -05:00
parent de18dc3a63
commit d0e1a0ae70
7 changed files with 1693 additions and 4 deletions

22
crypto/rcu_internal.h Normal file
View File

@ -0,0 +1,22 @@
/*
* Copyright 2023 The OpenSSL Project Authors. All Rights Reserved.
*
* Licensed under the Apache License 2.0 (the "License"). You may not use
* this file except in compliance with the License. You can obtain a copy
* in the file LICENSE in the source distribution or at
* https://www.openssl.org/source/license.html
*/
#ifndef OPENSSL_RCU_INTERNAL_H
# define OPENSSL_RCU_INTERNAL_H
# pragma once
struct rcu_qp;
struct rcu_cb_item {
rcu_cb_fn fn;
void *data;
struct rcu_cb_item *next;
};
#endif

View File

@ -9,6 +9,8 @@
#include <openssl/crypto.h>
#include "internal/cryptlib.h"
#include "internal/rcu.h"
#include "rcu_internal.h"
#if !defined(OPENSSL_THREADS) || defined(CRYPTO_TDEBUG)
@ -17,6 +19,82 @@
# include <unistd.h>
# endif
struct rcu_lock_st {
struct rcu_cb_item *cb_items;
};
CRYPTO_RCU_LOCK *ossl_rcu_lock_new(int num_writers)
{
struct rcu_lock_st *lock;
lock = OPENSSL_zalloc(sizeof(*lock));
return lock;
}
void ossl_rcu_lock_free(CRYPTO_RCU_LOCK *lock)
{
OPENSSL_free(lock);
}
void ossl_rcu_read_lock(CRYPTO_RCU_LOCK *lock)
{
return;
}
void ossl_rcu_write_lock(CRYPTO_RCU_LOCK *lock)
{
return;
}
void ossl_rcu_write_unlock(CRYPTO_RCU_LOCK *lock)
{
return;
}
void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
{
return;
}
void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
{
struct rcu_cb_item *items = lock->cb_items;
struct rcu_cb_item *tmp;
lock->cb_items = NULL;
while (items != NULL) {
tmp = items->next;
items->fn(items->data);
OPENSSL_free(items);
items = tmp;
}
}
int ossl_rcu_call(CRYPTO_RCU_LOCK *lock, rcu_cb_fn cb, void *data)
{
struct rcu_cb_item *new = OPENSSL_zalloc(sizeof(*new));
if (new == NULL)
return 0;
new->fn = cb;
new->data = data;
new->next = lock->cb_items;
lock->cb_items = new;
return 1;
}
void *ossl_rcu_uptr_deref(void **p)
{
return (void *)*p;
}
void ossl_rcu_assign_uptr(void **p, void **v)
{
*(void **)p = *(void **)v;
}
CRYPTO_RWLOCK *CRYPTO_THREAD_lock_new(void)
{
CRYPTO_RWLOCK *lock;

View File

@ -11,7 +11,10 @@
#define OPENSSL_SUPPRESS_DEPRECATED
#include <openssl/crypto.h>
#include <crypto/cryptlib.h>
#include "internal/cryptlib.h"
#include "internal/rcu.h"
#include "rcu_internal.h"
#if defined(__sun)
# include <atomic.h>
@ -42,12 +45,577 @@
# define USE_RWLOCK
# endif
# if defined(__GNUC__) && defined(__ATOMIC_ACQUIRE) && !defined(BROKEN_CLANG_ATOMICS)
# define ATOMIC_LOAD_N(p,o) __atomic_load_n(p, o)
# define ATOMIC_STORE_N(p, v, o) __atomic_store_n(p, v, o)
# define ATOMIC_STORE(p, v, o) __atomic_store(p, v, o)
# define ATOMIC_EXCHANGE_N(p, v, o) __atomic_exchange_n(p, v, o)
# define ATOMIC_ADD_FETCH(p, v, o) __atomic_add_fetch(p, v, o)
# define ATOMIC_FETCH_ADD(p, v, o) __atomic_fetch_add(p, v, o)
# define ATOMIC_SUB_FETCH(p, v, o) __atomic_sub_fetch(p, v, o)
# define ATOMIC_AND_FETCH(p, m, o) __atomic_and_fetch(p, m, o)
# define ATOMIC_OR_FETCH(p, m, o) __atomic_or_fetch(p, m, o)
#else
static pthread_mutex_t atomic_sim_lock = PTHREAD_MUTEX_INITIALIZER;
static inline void *fallback_atomic_load_n(void **p)
{
void *ret;
pthread_mutex_lock(&atomic_sim_lock);
ret = *(void **)p;
pthread_mutex_unlock(&atomic_sim_lock);
return ret;
}
# define ATOMIC_LOAD_N(p, o) fallback_atomic_load_n((void **)p)
static inline void *fallback_atomic_store_n(void **p, void *v)
{
void *ret;
pthread_mutex_lock(&atomic_sim_lock);
ret = *p;
*p = v;
pthread_mutex_unlock(&atomic_sim_lock);
return ret;
}
# define ATOMIC_STORE_N(p, v, o) fallback_atomic_store_n((void **)p, (void *)v)
static inline void fallback_atomic_store(void **p, void **v)
{
void *ret;
pthread_mutex_lock(&atomic_sim_lock);
ret = *p;
*p = *v;
v = ret;
pthread_mutex_unlock(&atomic_sim_lock);
}
# define ATOMIC_STORE(p, v, o) fallback_atomic_store((void **)p, (void **)v)
static inline void *fallback_atomic_exchange_n(void **p, void *v)
{
void *ret;
pthread_mutex_lock(&atomic_sim_lock);
ret = *p;
*p = v;
pthread_mutex_unlock(&atomic_sim_lock);
return ret;
}
#define ATOMIC_EXCHANGE_N(p, v, o) fallback_atomic_exchange_n((void **)p, (void *)v)
static inline uint64_t fallback_atomic_add_fetch(uint64_t *p, uint64_t v)
{
uint64_t ret;
pthread_mutex_lock(&atomic_sim_lock);
*p += v;
ret = *p;
pthread_mutex_unlock(&atomic_sim_lock);
return ret;
}
# define ATOMIC_ADD_FETCH(p, v, o) fallback_atomic_add_fetch(p, v)
static inline uint64_t fallback_atomic_fetch_add(uint64_t *p, uint64_t v)
{
uint64_t ret;
pthread_mutex_lock(&atomic_sim_lock);
ret = *p;
*p += v;
pthread_mutex_unlock(&atomic_sim_lock);
return ret;
}
# define ATOMIC_FETCH_ADD(p, v, o) fallback_atomic_fetch_add(p, v)
static inline uint64_t fallback_atomic_sub_fetch(uint64_t *p, uint64_t v)
{
uint64_t ret;
pthread_mutex_lock(&atomic_sim_lock);
*p -= v;
ret = *p;
pthread_mutex_unlock(&atomic_sim_lock);
return ret;
}
# define ATOMIC_SUB_FETCH(p, v, o) fallback_atomic_sub_fetch(p, v)
static inline uint64_t fallback_atomic_and_fetch(uint64_t *p, uint64_t m)
{
uint64_t ret;
pthread_mutex_lock(&atomic_sim_lock);
*p &= m;
ret = *p;
pthread_mutex_unlock(&atomic_sim_lock);
return ret;
}
# define ATOMIC_AND_FETCH(p, v, o) fallback_atomic_and_fetch(p, v)
static inline uint64_t fallback_atomic_or_fetch(uint64_t *p, uint64_t m)
{
uint64_t ret;
pthread_mutex_lock(&atomic_sim_lock);
*p |= m;
ret = *p;
pthread_mutex_unlock(&atomic_sim_lock);
return ret;
}
# define ATOMIC_OR_FETCH(p, v, o) fallback_atomic_or_fetch(p, v)
#endif
static CRYPTO_THREAD_LOCAL rcu_thr_key;
/*
* users is broken up into 2 parts
* bits 0-15 current readers
* bit 32-63 - ID
*/
# define READER_SHIFT 0
# define ID_SHIFT 32
# define READER_SIZE 16
# define ID_SIZE 32
# define READER_MASK (((uint64_t)1 << READER_SIZE) - 1)
# define ID_MASK (((uint64_t)1 << ID_SIZE) - 1)
# define READER_COUNT(x) (((uint64_t)(x) >> READER_SHIFT) & READER_MASK)
# define ID_VAL(x) (((uint64_t)(x) >> ID_SHIFT) & ID_MASK)
# define VAL_READER ((uint64_t)1 << READER_SHIFT)
# define VAL_ID(x) ((uint64_t)x << ID_SHIFT)
/*
* This is the core of an rcu lock. It tracks the readers and writers for the
* current quiescence point for a given lock. Users is the 64 bit value that
* stores the READERS/ID as defined above
*
*/
struct rcu_qp {
uint64_t users;
};
struct thread_qp {
struct rcu_qp *qp;
unsigned int depth;
CRYPTO_RCU_LOCK *lock;
};
#define MAX_QPS 10
/*
* This is the per thread tracking data
* that is assigned to each thread participating
* in an rcu qp
*
* qp points to the qp that it last acquired
*
*/
struct rcu_thr_data {
struct thread_qp thread_qps[MAX_QPS];
};
/*
* This is the internal version of a CRYPTO_RCU_LOCK
* it is cast from CRYPTO_RCU_LOCK
*/
struct rcu_lock_st {
/* Callbacks to call for next ossl_synchronize_rcu */
struct rcu_cb_item *cb_items;
/* rcu generation counter for in-order retirement */
uint32_t id_ctr;
/* Array of quiescent points for synchronization */
struct rcu_qp *qp_group;
/* Number of elements in qp_group array */
size_t group_count;
/* Index of the current qp in the qp_group array */
uint64_t reader_idx;
/* value of the next id_ctr value to be retired */
uint32_t next_to_retire;
/* index of the next free rcu_qp in the qp_group */
uint64_t current_alloc_idx;
/* number of qp's in qp_group array currently being retired */
uint32_t writers_alloced;
/* lock protecting write side operations */
pthread_mutex_t write_lock;
/* lock protecting updates to writers_alloced/current_alloc_idx */
pthread_mutex_t alloc_lock;
/* signal to wake threads waiting on alloc_lock */
pthread_cond_t alloc_signal;
/* lock to enforce in-order retirement */
pthread_mutex_t prior_lock;
/* signal to wake threads waiting on prior_lock */
pthread_cond_t prior_signal;
};
/*
* Called on thread exit to free the pthread key
* associated with this thread, if any
*/
static void free_rcu_thr_data(void *ptr)
{
struct rcu_thr_data *data =
(struct rcu_thr_data *)CRYPTO_THREAD_get_local(&rcu_thr_key);
OPENSSL_free(data);
CRYPTO_THREAD_set_local(&rcu_thr_key, NULL);
}
static void ossl_rcu_init(void)
{
CRYPTO_THREAD_init_local(&rcu_thr_key, NULL);
}
/* Read side acquisition of the current qp */
static struct rcu_qp *get_hold_current_qp(struct rcu_lock_st *lock)
{
uint64_t qp_idx;
/* get the current qp index */
for (;;) {
/*
* Notes on use of __ATOMIC_ACQUIRE
* We need to ensure the following:
* 1) That subsequent operations aren't optimized by hoisting them above
* this operation. Specifically, we don't want the below re-load of
* qp_idx to get optimized away
* 2) We want to ensure that any updating of reader_idx on the write side
* of the lock is flushed from a local cpu cache so that we see any
* updates prior to the load. This is a non-issue on cache coherent
* systems like x86, but is relevant on other arches
* Note: This applies to the reload below as well
*/
qp_idx = (uint64_t)ATOMIC_LOAD_N(&lock->reader_idx, __ATOMIC_ACQUIRE);
/*
* Notes of use of __ATOMIC_RELEASE
* This counter is only read by the write side of the lock, and so we
* specify __ATOMIC_RELEASE here to ensure that the write side of the
* lock see this during the spin loop read of users, as it waits for the
* reader count to approach zero
*/
ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, VAL_READER,
__ATOMIC_RELEASE);
/* if the idx hasn't changed, we're good, else try again */
if (qp_idx == (uint64_t)ATOMIC_LOAD_N(&lock->reader_idx, __ATOMIC_ACQUIRE))
break;
/*
* Notes on use of __ATOMIC_RELEASE
* As with the add above, we want to ensure that this decrement is
* seen by the write side of the lock as soon as it happens to prevent
* undue spinning waiting for write side completion
*/
ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, VAL_READER,
__ATOMIC_RELEASE);
}
return &lock->qp_group[qp_idx];
}
void ossl_rcu_read_lock(CRYPTO_RCU_LOCK *lock)
{
struct rcu_thr_data *data;
int i, available_qp = -1;
/*
* we're going to access current_qp here so ask the
* processor to fetch it
*/
data = CRYPTO_THREAD_get_local(&rcu_thr_key);
if (data == NULL) {
data = OPENSSL_zalloc(sizeof(*data));
OPENSSL_assert(data != NULL);
CRYPTO_THREAD_set_local(&rcu_thr_key, data);
ossl_init_thread_start(NULL, NULL, free_rcu_thr_data);
}
for (i = 0; i < MAX_QPS; i++) {
if (data->thread_qps[i].qp == NULL && available_qp == -1)
available_qp = i;
/* If we have a hold on this lock already, we're good */
if (data->thread_qps[i].lock == lock) {
data->thread_qps[i].depth++;
return;
}
}
/*
* if we get here, then we don't have a hold on this lock yet
*/
assert(available_qp != -1);
data->thread_qps[available_qp].qp = get_hold_current_qp(lock);
data->thread_qps[available_qp].depth = 1;
data->thread_qps[available_qp].lock = lock;
}
void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
{
int i;
struct rcu_thr_data *data = CRYPTO_THREAD_get_local(&rcu_thr_key);
uint64_t ret;
assert(data != NULL);
for (i = 0; i < MAX_QPS; i++) {
if (data->thread_qps[i].lock == lock) {
/*
* As with read side acquisition, we use __ATOMIC_RELEASE here
* to ensure that the decrement is published immediately
* to any write side waiters
*/
data->thread_qps[i].depth--;
if (data->thread_qps[i].depth == 0) {
ret = ATOMIC_SUB_FETCH(&data->thread_qps[i].qp->users, VAL_READER,
__ATOMIC_RELEASE);
OPENSSL_assert(ret != UINT64_MAX);
data->thread_qps[i].qp = NULL;
data->thread_qps[i].lock = NULL;
}
return;
}
}
/*
* if we get here, we're trying to unlock a lock that we never acquired
* thats fatal
*/
assert(0);
}
/*
* Write side allocation routine to get the current qp
* and replace it with a new one
*/
static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
{
uint64_t new_id;
uint64_t current_idx;
pthread_mutex_lock(&lock->alloc_lock);
/*
* we need at least one qp to be available with one
* left over, so that readers can start working on
* one that isn't yet being waited on
*/
while (lock->group_count - lock->writers_alloced < 2)
/* we have to wait for one to be free */
pthread_cond_wait(&lock->alloc_signal, &lock->alloc_lock);
current_idx = lock->current_alloc_idx;
/* Allocate the qp */
lock->writers_alloced++;
/* increment the allocation index */
lock->current_alloc_idx =
(lock->current_alloc_idx + 1) % lock->group_count;
/* get and insert a new id */
new_id = lock->id_ctr;
lock->id_ctr++;
new_id = VAL_ID(new_id);
/*
* Even though we are under a write side lock here
* We need to use atomic instructions to ensure that the results
* of this update are published to the read side prior to updating the
* reader idx below
*/
ATOMIC_AND_FETCH(&lock->qp_group[current_idx].users, ID_MASK,
__ATOMIC_RELEASE);
ATOMIC_OR_FETCH(&lock->qp_group[current_idx].users, new_id,
__ATOMIC_RELEASE);
/*
* update the reader index to be the prior qp
* Note the use of __ATOMIC_RELEASE here is based on the corresponding use
* of __ATOMIC_ACQUIRE in get_hold_current_qp, as we wan't any publication
* of this value to be seen on the read side immediately after it happens
*/
ATOMIC_STORE_N(&lock->reader_idx, lock->current_alloc_idx,
__ATOMIC_RELEASE);
/* wake up any waiters */
pthread_cond_signal(&lock->alloc_signal);
pthread_mutex_unlock(&lock->alloc_lock);
return &lock->qp_group[current_idx];
}
static void retire_qp(CRYPTO_RCU_LOCK *lock, struct rcu_qp *qp)
{
pthread_mutex_lock(&lock->alloc_lock);
lock->writers_alloced--;
pthread_cond_signal(&lock->alloc_signal);
pthread_mutex_unlock(&lock->alloc_lock);
}
static struct rcu_qp *allocate_new_qp_group(CRYPTO_RCU_LOCK *lock,
int count)
{
struct rcu_qp *new =
OPENSSL_zalloc(sizeof(*new) * count);
lock->group_count = count;
return new;
}
void ossl_rcu_write_lock(CRYPTO_RCU_LOCK *lock)
{
pthread_mutex_lock(&lock->write_lock);
}
void ossl_rcu_write_unlock(CRYPTO_RCU_LOCK *lock)
{
pthread_mutex_unlock(&lock->write_lock);
}
void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
{
struct rcu_qp *qp;
uint64_t count;
struct rcu_cb_item *cb_items, *tmpcb;
/*
* __ATOMIC_ACQ_REL is used here to ensure that we get any prior published
* writes before we read, and publish our write immediately
*/
cb_items = ATOMIC_EXCHANGE_N(&lock->cb_items, NULL, __ATOMIC_ACQ_REL);
qp = update_qp(lock);
/*
* wait for the reader count to reach zero
* Note the use of __ATOMIC_ACQUIRE here to ensure that any
* prior __ATOMIC_RELEASE write operation in get_hold_current_qp
* is visible prior to our read
*/
do {
count = (uint64_t)ATOMIC_LOAD_N(&qp->users, __ATOMIC_ACQUIRE);
} while (READER_COUNT(count) != 0);
/* retire in order */
pthread_mutex_lock(&lock->prior_lock);
while (lock->next_to_retire != ID_VAL(count))
pthread_cond_wait(&lock->prior_signal, &lock->prior_lock);
lock->next_to_retire++;
pthread_cond_broadcast(&lock->prior_signal);
pthread_mutex_unlock(&lock->prior_lock);
retire_qp(lock, qp);
/* handle any callbacks that we have */
while (cb_items != NULL) {
tmpcb = cb_items;
cb_items = cb_items->next;
tmpcb->fn(tmpcb->data);
OPENSSL_free(tmpcb);
}
}
int ossl_rcu_call(CRYPTO_RCU_LOCK *lock, rcu_cb_fn cb, void *data)
{
struct rcu_cb_item *new =
OPENSSL_zalloc(sizeof(*new));
if (new == NULL)
return 0;
new->data = data;
new->fn = cb;
/*
* Use __ATOMIC_ACQ_REL here to indicate that any prior writes to this
* list are visible to us prior to reading, and publish the new value
* immediately
*/
new->next = ATOMIC_EXCHANGE_N(&lock->cb_items, new, __ATOMIC_ACQ_REL);
return 1;
}
void *ossl_rcu_uptr_deref(void **p)
{
return (void *)ATOMIC_LOAD_N(p, __ATOMIC_ACQUIRE);
}
void ossl_rcu_assign_uptr(void **p, void **v)
{
ATOMIC_STORE(p, v, __ATOMIC_RELEASE);
}
static CRYPTO_ONCE rcu_init_once = CRYPTO_ONCE_STATIC_INIT;
CRYPTO_RCU_LOCK *ossl_rcu_lock_new(int num_writers)
{
struct rcu_lock_st *new;
if (!CRYPTO_THREAD_run_once(&rcu_init_once, ossl_rcu_init))
return NULL;
if (num_writers < 1)
num_writers = 1;
new = OPENSSL_zalloc(sizeof(*new));
if (new == NULL)
return NULL;
pthread_mutex_init(&new->write_lock, NULL);
pthread_mutex_init(&new->prior_lock, NULL);
pthread_mutex_init(&new->alloc_lock, NULL);
pthread_cond_init(&new->prior_signal, NULL);
pthread_cond_init(&new->alloc_signal, NULL);
new->qp_group = allocate_new_qp_group(new, num_writers + 1);
if (new->qp_group == NULL) {
OPENSSL_free(new);
new = NULL;
}
return new;
}
void ossl_rcu_lock_free(CRYPTO_RCU_LOCK *lock)
{
struct rcu_lock_st *rlock = (struct rcu_lock_st *)lock;
if (lock == NULL)
return;
/* make sure we're synchronized */
ossl_synchronize_rcu(rlock);
OPENSSL_free(rlock->qp_group);
/* There should only be a single qp left now */
OPENSSL_free(rlock);
}
CRYPTO_RWLOCK *CRYPTO_THREAD_lock_new(void)
{
# ifdef USE_RWLOCK
CRYPTO_RWLOCK *lock;
if ((lock = CRYPTO_zalloc(sizeof(pthread_rwlock_t), NULL, 0)) == NULL)
if ((lock = OPENSSL_zalloc(sizeof(pthread_rwlock_t))) == NULL)
/* Don't set error, to avoid recursion blowup. */
return NULL;
@ -59,7 +627,7 @@ CRYPTO_RWLOCK *CRYPTO_THREAD_lock_new(void)
pthread_mutexattr_t attr;
CRYPTO_RWLOCK *lock;
if ((lock = CRYPTO_zalloc(sizeof(pthread_mutex_t), NULL, 0)) == NULL)
if ((lock = OPENSSL_zalloc(sizeof(pthread_mutex_t))) == NULL)
/* Don't set error, to avoid recursion blowup. */
return NULL;

View File

@ -13,6 +13,7 @@
# define USE_RWLOCK
# endif
#endif
#include <assert.h>
/*
* VC++ 2008 or earlier x86 compilers do not have an inline implementation
@ -27,6 +28,11 @@
#endif
#include <openssl/crypto.h>
#include <crypto/cryptlib.h>
#include "internal/common.h"
#include "internal/thread_arch.h"
#include "internal/rcu.h"
#include "rcu_internal.h"
#if defined(OPENSSL_THREADS) && !defined(CRYPTO_TDEBUG) && defined(OPENSSL_SYS_WINDOWS)
@ -37,20 +43,370 @@ typedef struct {
} CRYPTO_win_rwlock;
# endif
static CRYPTO_THREAD_LOCAL rcu_thr_key;
# define READER_SHIFT 0
# define ID_SHIFT 32
# define READER_SIZE 32
# define ID_SIZE 32
# define READER_MASK (((LONG64)1 << READER_SIZE)-1)
# define ID_MASK (((LONG64)1 << ID_SIZE)-1)
# define READER_COUNT(x) (((LONG64)(x) >> READER_SHIFT) & READER_MASK)
# define ID_VAL(x) (((LONG64)(x) >> ID_SHIFT) & ID_MASK)
# define VAL_READER ((LONG64)1 << READER_SHIFT)
# define VAL_ID(x) ((LONG64)x << ID_SHIFT)
/*
* This defines a quescent point (qp)
* This is the barrier beyond which a writer
* must wait before freeing data that was
* atomically updated
*/
struct rcu_qp {
volatile LONG64 users;
};
struct thread_qp {
struct rcu_qp *qp;
unsigned int depth;
CRYPTO_RCU_LOCK *lock;
};
#define MAX_QPS 10
/*
* This is the per thread tracking data
* that is assigned to each thread participating
* in an rcu qp
*
* qp points to the qp that it last acquired
*
*/
struct rcu_thr_data {
struct thread_qp thread_qps[MAX_QPS];
};
/*
* This is the internal version of a CRYPTO_RCU_LOCK
* it is cast from CRYPTO_RCU_LOCK
*/
struct rcu_lock_st {
struct rcu_cb_item *cb_items;
uint32_t id_ctr;
struct rcu_qp *qp_group;
size_t group_count;
uint32_t next_to_retire;
volatile long int reader_idx;
uint32_t current_alloc_idx;
uint32_t writers_alloced;
CRYPTO_MUTEX *write_lock;
CRYPTO_MUTEX *alloc_lock;
CRYPTO_CONDVAR *alloc_signal;
CRYPTO_MUTEX *prior_lock;
CRYPTO_CONDVAR *prior_signal;
};
/*
* Called on thread exit to free the pthread key
* associated with this thread, if any
*/
static void free_rcu_thr_data(void *ptr)
{
struct rcu_thr_data *data =
(struct rcu_thr_data *)CRYPTO_THREAD_get_local(&rcu_thr_key);
OPENSSL_free(data);
CRYPTO_THREAD_set_local(&rcu_thr_key, NULL);
}
static void ossl_rcu_init(void)
{
CRYPTO_THREAD_init_local(&rcu_thr_key, NULL);
ossl_init_thread_start(NULL, NULL, free_rcu_thr_data);
}
static struct rcu_qp *allocate_new_qp_group(struct rcu_lock_st *lock,
int count)
{
struct rcu_qp *new =
OPENSSL_zalloc(sizeof(*new) * count);
lock->group_count = count;
return new;
}
static CRYPTO_ONCE rcu_init_once = CRYPTO_ONCE_STATIC_INIT;
CRYPTO_RCU_LOCK *ossl_rcu_lock_new(int num_writers)
{
struct rcu_lock_st *new;
if (!CRYPTO_THREAD_run_once(&rcu_init_once, ossl_rcu_init))
return NULL;
if (num_writers < 1)
num_writers = 1;
new = OPENSSL_zalloc(sizeof(*new));
if (new == NULL)
return NULL;
new->write_lock = ossl_crypto_mutex_new();
new->alloc_signal = ossl_crypto_condvar_new();
new->prior_signal = ossl_crypto_condvar_new();
new->alloc_lock = ossl_crypto_mutex_new();
new->prior_lock = ossl_crypto_mutex_new();
new->write_lock = ossl_crypto_mutex_new();
new->qp_group = allocate_new_qp_group(new, num_writers + 1);
if (new->qp_group == NULL
|| new->alloc_signal == NULL
|| new->prior_signal == NULL
|| new->write_lock == NULL
|| new->alloc_lock == NULL
|| new->prior_lock == NULL) {
OPENSSL_free(new->qp_group);
ossl_crypto_condvar_free(&new->alloc_signal);
ossl_crypto_condvar_free(&new->prior_signal);
ossl_crypto_mutex_free(&new->alloc_lock);
ossl_crypto_mutex_free(&new->prior_lock);
ossl_crypto_mutex_free(&new->write_lock);
OPENSSL_free(new);
new = NULL;
}
return new;
}
void ossl_rcu_lock_free(CRYPTO_RCU_LOCK *lock)
{
OPENSSL_free(lock->qp_group);
ossl_crypto_condvar_free(&lock->alloc_signal);
ossl_crypto_condvar_free(&lock->prior_signal);
ossl_crypto_mutex_free(&lock->alloc_lock);
ossl_crypto_mutex_free(&lock->prior_lock);
ossl_crypto_mutex_free(&lock->write_lock);
OPENSSL_free(lock);
}
static inline struct rcu_qp *get_hold_current_qp(CRYPTO_RCU_LOCK *lock)
{
uint32_t qp_idx;
/* get the current qp index */
for (;;) {
qp_idx = InterlockedOr(&lock->reader_idx, 0);
InterlockedAdd64(&lock->qp_group[qp_idx].users, VAL_READER);
if (qp_idx == InterlockedOr(&lock->reader_idx, 0))
break;
InterlockedAdd64(&lock->qp_group[qp_idx].users, -VAL_READER);
}
return &lock->qp_group[qp_idx];
}
void ossl_rcu_read_lock(CRYPTO_RCU_LOCK *lock)
{
struct rcu_thr_data *data;
int i;
int available_qp = -1;
/*
* we're going to access current_qp here so ask the
* processor to fetch it
*/
data = CRYPTO_THREAD_get_local(&rcu_thr_key);
if (data == NULL) {
data = OPENSSL_zalloc(sizeof(*data));
OPENSSL_assert(data != NULL);
CRYPTO_THREAD_set_local(&rcu_thr_key, data);
}
for (i = 0; i < MAX_QPS; i++) {
if (data->thread_qps[i].qp == NULL && available_qp == -1)
available_qp = i;
/* If we have a hold on this lock already, we're good */
if (data->thread_qps[i].lock == lock)
return;
}
/*
* if we get here, then we don't have a hold on this lock yet
*/
assert(available_qp != -1);
data->thread_qps[available_qp].qp = get_hold_current_qp(lock);
data->thread_qps[available_qp].depth = 1;
data->thread_qps[available_qp].lock = lock;
}
void ossl_rcu_write_lock(CRYPTO_RCU_LOCK *lock)
{
ossl_crypto_mutex_lock(lock->write_lock);
}
void ossl_rcu_write_unlock(CRYPTO_RCU_LOCK *lock)
{
ossl_crypto_mutex_unlock(lock->write_lock);
}
void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
{
struct rcu_thr_data *data = CRYPTO_THREAD_get_local(&rcu_thr_key);
int i;
LONG64 ret;
assert(data != NULL);
for (i = 0; i < MAX_QPS; i++) {
if (data->thread_qps[i].lock == lock) {
data->thread_qps[i].depth--;
if (data->thread_qps[i].depth == 0) {
ret = InterlockedAdd64(&data->thread_qps[i].qp->users, -VAL_READER);
OPENSSL_assert(ret >= 0);
data->thread_qps[i].qp = NULL;
data->thread_qps[i].lock = NULL;
}
return;
}
}
}
static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
{
uint64_t new_id;
uint32_t current_idx;
uint32_t tmp;
ossl_crypto_mutex_lock(lock->alloc_lock);
/*
* we need at least one qp to be available with one
* left over, so that readers can start working on
* one that isn't yet being waited on
*/
while (lock->group_count - lock->writers_alloced < 2)
ossl_crypto_condvar_wait(lock->alloc_signal, lock->alloc_lock);
current_idx = lock->current_alloc_idx;
/* Allocate the qp */
lock->writers_alloced++;
/* increment the allocation index */
lock->current_alloc_idx =
(lock->current_alloc_idx + 1) % lock->group_count;
/* get and insert a new id */
new_id = lock->id_ctr;
lock->id_ctr++;
new_id = VAL_ID(new_id);
InterlockedAnd64(&lock->qp_group[current_idx].users, ID_MASK);
InterlockedAdd64(&lock->qp_group[current_idx].users, new_id);
/* update the reader index to be the prior qp */
tmp = lock->current_alloc_idx;
InterlockedExchange(&lock->reader_idx, tmp);
/* wake up any waiters */
ossl_crypto_condvar_broadcast(lock->alloc_signal);
ossl_crypto_mutex_unlock(lock->alloc_lock);
return &lock->qp_group[current_idx];
}
static void retire_qp(CRYPTO_RCU_LOCK *lock,
struct rcu_qp *qp)
{
ossl_crypto_mutex_lock(lock->alloc_lock);
lock->writers_alloced--;
ossl_crypto_condvar_broadcast(lock->alloc_signal);
ossl_crypto_mutex_unlock(lock->alloc_lock);
}
void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
{
struct rcu_qp *qp;
uint64_t count;
struct rcu_cb_item *cb_items, *tmpcb;
/* before we do anything else, lets grab the cb list */
cb_items = InterlockedExchangePointer((void * volatile *)&lock->cb_items, NULL);
qp = update_qp(lock);
/* wait for the reader count to reach zero */
do {
count = InterlockedOr64(&qp->users, 0);
} while (READER_COUNT(count) != 0);
/* retire in order */
ossl_crypto_mutex_lock(lock->prior_lock);
while (lock->next_to_retire != ID_VAL(count))
ossl_crypto_condvar_wait(lock->prior_signal, lock->prior_lock);
lock->next_to_retire++;
ossl_crypto_condvar_broadcast(lock->prior_signal);
ossl_crypto_mutex_unlock(lock->prior_lock);
retire_qp(lock, qp);
/* handle any callbacks that we have */
while (cb_items != NULL) {
tmpcb = cb_items;
cb_items = cb_items->next;
tmpcb->fn(tmpcb->data);
OPENSSL_free(tmpcb);
}
/* and we're done */
return;
}
int ossl_rcu_call(CRYPTO_RCU_LOCK *lock, rcu_cb_fn cb, void *data)
{
struct rcu_cb_item *new;
struct rcu_cb_item *prev;
new = OPENSSL_zalloc(sizeof(struct rcu_cb_item));
if (new == NULL)
return 0;
prev = new;
new->data = data;
new->fn = cb;
InterlockedExchangePointer((void * volatile *)&lock->cb_items, prev);
new->next = prev;
return 1;
}
void *ossl_rcu_uptr_deref(void **p)
{
return (void *)*p;
}
void ossl_rcu_assign_uptr(void **p, void **v)
{
InterlockedExchangePointer((void * volatile *)p, (void *)*v);
}
CRYPTO_RWLOCK *CRYPTO_THREAD_lock_new(void)
{
CRYPTO_RWLOCK *lock;
# ifdef USE_RWLOCK
CRYPTO_win_rwlock *rwlock;
if ((lock = CRYPTO_zalloc(sizeof(CRYPTO_win_rwlock), NULL, 0)) == NULL)
if ((lock = OPENSSL_zalloc(sizeof(CRYPTO_win_rwlock))) == NULL)
/* Don't set error, to avoid recursion blowup. */
return NULL;
rwlock = lock;
InitializeSRWLock(&rwlock->lock);
# else
if ((lock = CRYPTO_zalloc(sizeof(CRITICAL_SECTION), NULL, 0)) == NULL)
if ((lock = OPENSSL_zalloc(sizeof(CRITICAL_SECTION))) == NULL)
/* Don't set error, to avoid recursion blowup. */
return NULL;

View File

@ -0,0 +1,258 @@
=pod
=head1 NAME
ossl_rcu_lock_new,
ossl_rcu_lock_free, ossl_rcu_read_lock,
ossl_rcu_read_unlock, ossl_rcu_write_lock,
ossl_rcu_write_unlock, ossl_synchronize_rcu,
ossl_rcu_call, ossl_rcu_deref,
ossl_rcu_assign_ptr, ossl_rcu_uptr_deref,
ossl_rcu_assign_uptr
- perform read-copy-update locking
=head1 SYNOPSIS
CRYPTO_RCU_LOCK *ossl_rcu_lock_new(int num_writers);
void ossl_rcu_read_lock(CRYPTO_RCU_LOCK *lock);
void ossl_rcu_write_lock(CRYPTO_RCU_LOCK *lock);
void ossl_rcu_write_unlock(CRYPTO_RCU_LOCK *lock);
void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock);
void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock);
void ossl_rcu_call(CRYPTO_RCU_LOCK *lock, rcu_cb_fn cb, void *data);
void *ossl_rcu_deref(void **p);
void ossl_rcu_uptr_deref(void **p);
void ossl_rcu_assign_ptr(void **p, void **v);
void ossl_rcu_assign_uptr(void **p, void **v);
void ossl_rcu_lock_free(CRYPTO_RCU_LOCK *lock);
=head1 DESCRIPTION
OpenSSL can be safely used in multi-threaded applications provided that
support for the underlying OS threading API is built-in. Currently, OpenSSL
supports the pthread and Windows APIs. OpenSSL can also be built without
any multi-threading support, for example on platforms that don't provide
any threading support or that provide a threading API that is not yet
supported by OpenSSL.
In addition to more traditional Read/Write locks, OpenSSL provides
Read-Copy-Update (RCU) locks, which allow for always nonblocking read paths.
The following multi-threading functions are provided:
=over 2
=item *
ossl_rcu_assign_uptr() assigns the value pointed to by v to the
location pointed to by p. This function should typically not be used, rely
instead on the ossl_rcu_assign_ptr() macro.
=item *
ossl_rcu_uptr_deref() returns the value stored at the
location pointed to by p. This function should typically not be used, rely
instead on the ossl_rcu_deref() macro.
=item *
ossl_rcu_assign_ptr() assigns the value pointed to by v to
location pointed to by p.
=item *
ossl_rcu_lock_new() allocates a new RCU lock. The I<num_writers> param
indicates the number of write side threads which may execute
ossl_synchronize_rcu() in parallel. The value must be at least 1, but may be
larger to obtain increased write side throughput at the cost of additional
internal memory usage. A value of 1 is generally recommended.
=item *
ossl_rcu_read_lock() acquires a read side hold on data protected by
the lock.
=item *
ossl_rcu_read_unlock() releases a read side hold on data protected by
the lock.
=item *
ossl_rcu_write_lock() acquires a write side hold on data protected by
the lock. Note only one writer per lock is permitted, as with read/write locks.
=item *
ossl_rcu_write_unlock() releases a write side hold on data protected
by the lock.
=item *
ossl_synchronize_rcu() blocks the calling thread until all read side
holds on the lock have been released, guaranteeing that any old data updated by
the write side thread is safe to free.
=item *
ossl_rcu_call() enqueues a callback function to the lock, to be called
when the next synchronization completes. Note: It is not guaranteed that the
thread which enqueued the callback will be the thread which executes the
callback
=item *
ossl_rcu_deref(p) atomically reads a pointer under an RCU locks
protection
=item *
ossl_rcu_assign_ptr(p,v) atomically writes to a pointer under an
RCU locks protection
=item *
ossl_rcu_lock_free() frees an allocated RCU lock
=back
=head1 RETURN VALUES
ossl_rcu_lock_new() returns a pointer to a newly created RCU lock structure.
ossl_rcu_deref() and ossl_rcu_uptr_deref() return the value pointed
to by the passed in value v.
All other functions return no value.
=head1 EXAMPLES
You can find out if OpenSSL was configured with thread support:
#include <openssl/opensslconf.h>
#if defined(OPENSSL_THREADS)
/* thread support enabled */
#else
/* no thread support */
#endif
This example safely initializes and uses a lock.
#include "internal/rcu.h"
struct foo {
int aval;
char *name;
};
static CRYPTO_ONCE once = CRYPTO_ONCE_STATIC_INIT;
static CRYPTO_RCU_LOCK *lock;
static struct foo *fooptr = NULL;
static void myinit(void)
{
lock = ossl_rcu_lock_new(1);
}
static int initlock(void)
{
if (!RUN_ONCE(&once, myinit) || lock == NULL)
return 0;
return 1;
}
static void writer_thread()
{
struct foo *newfoo;
struct foo *oldfoo;
initlock();
/*
* update steps in an rcu model
*/
/*
* 1) create a new shared object
*/
newfoo = OPENSSL_zalloc(sizeof(struct foo));
/*
* acquire the write side lock
*/
ossl_rcu_write_lock(lock);
/*
* 2) read the old pointer
*/
oldfoo = ossl_rcu_deref(&fooptr);
/*
* 3) Copy the old pointer to the new object, and
* make any needed adjustments
*/
memcpy(newfoo, oldfoo, sizeof(struct foo));
newfoo->aval++;
/*
* 4) Update the shared pointer to the new value
*/
ossl_rcu_assign_ptr(&fooptr, &newfoo);
/*
* 5) Release the write side lock
*/
ossl_rcu_write_unlock(lock);
/*
* 6) wait for any read side holds on the old data
* to be released
*/
ossl_synchronize_rcu(lock);
/*
* 7) free the old pointer, now that there are no
* further readers
*/
OPENSSL_free(oldfoo);
}
static void reader_thread()
{
struct foo *myfoo = NULL;
int a;
/*
* 1) Acquire a read side hold on the shared data
*/
ossl_rcu_read_lock(lock);
/*
* 2) Access the shared data pointer
*/
myfoo = ossl_rcu_deref(&fooptr);
/*
* 3) Read the data from the pointer
*/
a = myfoo->aval;
/*
* 4) Indicate our hold on the shared data is complete
*/
ossl_rcu_read_unlock(lock);
}
=head1 SEE ALSO
L<crypto(7)>, L<openssl-threads(7)>.
=head1 COPYRIGHT
Copyright 2023 The OpenSSL Project Authors. All Rights Reserved.
Licensed under the Apache License 2.0 (the "License"). You may not use
this file except in compliance with the License. You can obtain a copy
in the file LICENSE in the source distribution or at
L<https://www.openssl.org/source/license.html>.
=cut

31
include/internal/rcu.h Normal file
View File

@ -0,0 +1,31 @@
/*
* Copyright 2023 The OpenSSL Project Authors. All Rights Reserved.
*
* Licensed under the Apache License 2.0 (the "License"). You may not use
* this file except in compliance with the License. You can obtain a copy
* in the file LICENSE in the source distribution or at
* https://www.openssl.org/source/license.html
*/
#ifndef OPENSSL_RCU_H
# define OPENSSL_RCU_H
# pragma once
typedef void (*rcu_cb_fn)(void *data);
typedef struct rcu_lock_st CRYPTO_RCU_LOCK;
CRYPTO_RCU_LOCK *ossl_rcu_lock_new(int num_writers);
void ossl_rcu_lock_free(CRYPTO_RCU_LOCK *lock);
void ossl_rcu_read_lock(CRYPTO_RCU_LOCK *lock);
void ossl_rcu_write_lock(CRYPTO_RCU_LOCK *lock);
void ossl_rcu_write_unlock(CRYPTO_RCU_LOCK *lock);
void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock);
void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock);
int ossl_rcu_call(CRYPTO_RCU_LOCK *lock, rcu_cb_fn cb, void *data);
void *ossl_rcu_uptr_deref(void **p);
void ossl_rcu_assign_uptr(void **p, void **v);
#define ossl_rcu_deref(p) ossl_rcu_uptr_deref((void **)p)
#define ossl_rcu_assign_ptr(p,v) ossl_rcu_assign_uptr((void **)p, (void **)v)
#endif

View File

@ -29,9 +29,18 @@
#include <openssl/evp.h>
#include "internal/tsan_assist.h"
#include "internal/nelem.h"
#include "internal/time.h"
#include "internal/rcu.h"
#include "testutil.h"
#include "threadstest.h"
#ifdef __SANITIZE_THREAD__
#include <sanitizer/tsan_interface.h>
#define TSAN_ACQUIRE(s) __tsan_acquire(s)
#else
#define TSAN_ACQUIRE(s)
#endif
/* Limit the maximum number of threads */
#define MAXIMUM_THREADS 10
@ -91,6 +100,367 @@ static int test_lock(void)
return res;
}
#if defined(OPENSSL_THREADS)
static int contention = 0;
static int rwwriter1_done = 0;
static int rwwriter2_done = 0;
static int rwreader1_iterations = 0;
static int rwreader2_iterations = 0;
static int rwwriter1_iterations = 0;
static int rwwriter2_iterations = 0;
static int *rwwriter_ptr = NULL;
static int rw_torture_result = 1;
static CRYPTO_RWLOCK *rwtorturelock = NULL;
static void rwwriter_fn(int id, int *iterations)
{
int count;
int *old, *new;
OSSL_TIME t1, t2;
t1 = ossl_time_now();
for (count = 0; ; count++) {
new = CRYPTO_zalloc(sizeof (int), NULL, 0);
if (contention == 0)
OSSL_sleep(1000);
if (!CRYPTO_THREAD_write_lock(rwtorturelock))
abort();
if (rwwriter_ptr != NULL) {
*new = *rwwriter_ptr + 1;
} else {
*new = 0;
}
old = rwwriter_ptr;
rwwriter_ptr = new;
if (!CRYPTO_THREAD_unlock(rwtorturelock))
abort();
if (old != NULL)
CRYPTO_free(old, __FILE__, __LINE__);
t2 = ossl_time_now();
if ((ossl_time2seconds(t2) - ossl_time2seconds(t1)) >= 4)
break;
}
*iterations = count;
return;
}
static void rwwriter1_fn(void)
{
int local;
TEST_info("Starting writer1");
rwwriter_fn(1, &rwwriter1_iterations);
CRYPTO_atomic_add(&rwwriter1_done, 1, &local, NULL);
}
static void rwwriter2_fn(void)
{
int local;
TEST_info("Starting writer 2");
rwwriter_fn(2, &rwwriter2_iterations);
CRYPTO_atomic_add(&rwwriter2_done, 1, &local, NULL);
}
static void rwreader_fn(int *iterations)
{
unsigned int count = 0;
int old = 0;
int lw1 = 0;
int lw2 = 0;
if (CRYPTO_THREAD_read_lock(rwtorturelock) == 0)
abort();
while (lw1 != 1 || lw2 != 1) {
CRYPTO_atomic_add(&rwwriter1_done, 0, &lw1, NULL);
CRYPTO_atomic_add(&rwwriter2_done, 0, &lw2, NULL);
count++;
if (rwwriter_ptr != NULL && old > *rwwriter_ptr) {
TEST_info("rwwriter pointer went backwards\n");
rw_torture_result = 0;
}
if (CRYPTO_THREAD_unlock(rwtorturelock) == 0)
abort();
*iterations = count;
if (rw_torture_result == 0) {
*iterations = count;
return;
}
if (CRYPTO_THREAD_read_lock(rwtorturelock) == 0)
abort();
}
*iterations = count;
if (CRYPTO_THREAD_unlock(rwtorturelock) == 0)
abort();
}
static void rwreader1_fn(void)
{
TEST_info("Starting reader 1");
rwreader_fn(&rwreader1_iterations);
}
static void rwreader2_fn(void)
{
TEST_info("Starting reader 2");
rwreader_fn(&rwreader2_iterations);
}
static thread_t rwwriter1;
static thread_t rwwriter2;
static thread_t rwreader1;
static thread_t rwreader2;
static int _torture_rw(void)
{
double tottime = 0;
int ret = 0;
double avr, avw;
OSSL_TIME t1, t2;
struct timeval dtime;
rwtorturelock = CRYPTO_THREAD_lock_new();
rwwriter1_iterations = 0;
rwwriter2_iterations = 0;
rwreader1_iterations = 0;
rwreader2_iterations = 0;
rwwriter1_done = 0;
rwwriter2_done = 0;
rw_torture_result = 1;
memset(&rwwriter1, 0, sizeof(thread_t));
memset(&rwwriter2, 0, sizeof(thread_t));
memset(&rwreader1, 0, sizeof(thread_t));
memset(&rwreader2, 0, sizeof(thread_t));
TEST_info("Staring rw torture");
t1 = ossl_time_now();
if (!TEST_true(run_thread(&rwreader1, rwreader1_fn))
|| !TEST_true(run_thread(&rwreader2, rwreader2_fn))
|| !TEST_true(run_thread(&rwwriter1, rwwriter1_fn))
|| !TEST_true(run_thread(&rwwriter2, rwwriter2_fn))
|| !TEST_true(wait_for_thread(rwwriter1))
|| !TEST_true(wait_for_thread(rwwriter2))
|| !TEST_true(wait_for_thread(rwreader1))
|| !TEST_true(wait_for_thread(rwreader2)))
goto out;
t2 = ossl_time_now();
dtime = ossl_time_to_timeval(ossl_time_subtract(t2, t1));
tottime = dtime.tv_sec + (dtime.tv_usec / 1e6);
TEST_info("rw_torture_result is %d\n", rw_torture_result);
TEST_info("performed %d reads and %d writes over 2 read and 2 write threads in %e seconds",
rwreader1_iterations + rwreader2_iterations,
rwwriter1_iterations + rwwriter2_iterations, tottime);
avr = tottime / (rwreader1_iterations + rwreader2_iterations);
avw = (tottime / (rwwriter1_iterations + rwwriter2_iterations));
TEST_info("Average read time %e/read", avr);
TEST_info("Averate write time %e/write", avw);
if (TEST_int_eq(rw_torture_result, 1))
ret = 1;
out:
CRYPTO_THREAD_lock_free(rwtorturelock);
rwtorturelock = NULL;
return ret;
}
static int torture_rw_low(void)
{
contention = 0;
return _torture_rw();
}
static int torture_rw_high(void)
{
contention = 1;
return _torture_rw();
}
static CRYPTO_RCU_LOCK *rcu_lock = NULL;
static int writer1_done = 0;
static int writer2_done = 0;
static int reader1_iterations = 0;
static int reader2_iterations = 0;
static int writer1_iterations = 0;
static int writer2_iterations = 0;
static unsigned int *writer_ptr = NULL;
static unsigned int global_ctr = 0;
static int rcu_torture_result = 1;
static void free_old_rcu_data(void *data)
{
CRYPTO_free(data, NULL, 0);
}
static void writer_fn(int id, int *iterations)
{
int count;
OSSL_TIME t1, t2;
unsigned int *old, *new;
t1 = ossl_time_now();
for (count = 0; ; count++) {
new = CRYPTO_zalloc(sizeof(int), NULL, 0);
if (contention == 0)
OSSL_sleep(1000);
ossl_rcu_write_lock(rcu_lock);
old = ossl_rcu_deref(&writer_ptr);
TSAN_ACQUIRE(&writer_ptr);
*new = global_ctr++;
ossl_rcu_assign_ptr(&writer_ptr, &new);
if (contention == 0)
ossl_rcu_call(rcu_lock, free_old_rcu_data, old);
ossl_rcu_write_unlock(rcu_lock);
if (contention != 0) {
ossl_synchronize_rcu(rcu_lock);
CRYPTO_free(old, NULL, 0);
}
t2 = ossl_time_now();
if ((ossl_time2seconds(t2) - ossl_time2seconds(t1)) >= 4)
break;
}
*iterations = count;
return;
}
static void writer1_fn(void)
{
int local;
TEST_info("Starting writer1");
writer_fn(1, &writer1_iterations);
CRYPTO_atomic_add(&writer1_done, 1, &local, NULL);
}
static void writer2_fn(void)
{
int local;
TEST_info("Starting writer2");
writer_fn(2, &writer2_iterations);
CRYPTO_atomic_add(&writer2_done, 1, &local, NULL);
}
static void reader_fn(int *iterations)
{
unsigned int count = 0;
unsigned int *valp;
unsigned int val;
unsigned int oldval = 0;
int lw1 = 0;
int lw2 = 0;
while (lw1 != 1 || lw2 != 1) {
CRYPTO_atomic_add(&writer1_done, 0, &lw1, NULL);
CRYPTO_atomic_add(&writer2_done, 0, &lw2, NULL);
count++;
ossl_rcu_read_lock(rcu_lock);
valp = ossl_rcu_deref(&writer_ptr);
val = (valp == NULL) ? 0 : *valp;
if (oldval > val) {
TEST_info("rcu torture value went backwards! (%p) %x : %x\n", (void *)valp, oldval, val);
rcu_torture_result = 0;
}
oldval = val; /* just try to deref the pointer */
ossl_rcu_read_unlock(rcu_lock);
if (rcu_torture_result == 0) {
*iterations = count;
return;
}
}
*iterations = count;
}
static void reader1_fn(void)
{
TEST_info("Starting reader 1");
reader_fn(&reader1_iterations);
}
static void reader2_fn(void)
{
TEST_info("Starting reader 2");
reader_fn(&reader2_iterations);
}
static thread_t writer1;
static thread_t writer2;
static thread_t reader1;
static thread_t reader2;
static int _torture_rcu(void)
{
OSSL_TIME t1, t2;
struct timeval dtime;
double tottime;
double avr, avw;
memset(&writer1, 0, sizeof(thread_t));
memset(&writer2, 0, sizeof(thread_t));
memset(&reader1, 0, sizeof(thread_t));
memset(&reader2, 0, sizeof(thread_t));
writer1_iterations = 0;
writer2_iterations = 0;
reader1_iterations = 0;
reader2_iterations = 0;
writer1_done = 0;
writer2_done = 0;
rcu_torture_result = 1;
rcu_lock = ossl_rcu_lock_new(1);
TEST_info("Staring rcu torture");
t1 = ossl_time_now();
if (!TEST_true(run_thread(&reader1, reader1_fn))
|| !TEST_true(run_thread(&reader2, reader2_fn))
|| !TEST_true(run_thread(&writer1, writer1_fn))
|| !TEST_true(run_thread(&writer2, writer2_fn))
|| !TEST_true(wait_for_thread(writer1))
|| !TEST_true(wait_for_thread(writer2))
|| !TEST_true(wait_for_thread(reader1))
|| !TEST_true(wait_for_thread(reader2)))
return 0;
t2 = ossl_time_now();
dtime = ossl_time_to_timeval(ossl_time_subtract(t2, t1));
tottime = dtime.tv_sec + (dtime.tv_usec / 1e6);
TEST_info("rcu_torture_result is %d\n", rcu_torture_result);
TEST_info("performed %d reads and %d writes over 2 read and 2 write threads in %e seconds",
reader1_iterations + reader2_iterations,
writer1_iterations + writer2_iterations, tottime);
avr = tottime / (reader1_iterations + reader2_iterations);
avw = tottime / (writer1_iterations + writer2_iterations);
TEST_info("Average read time %e/read", avr);
TEST_info("Average write time %e/write", avw);
ossl_rcu_lock_free(rcu_lock);
if (!TEST_int_eq(rcu_torture_result, 1))
return 0;
return 1;
}
static int torture_rcu_low(void)
{
contention = 0;
return _torture_rcu();
}
static int torture_rcu_high(void)
{
contention = 1;
return _torture_rcu();
}
#endif
static CRYPTO_ONCE once_run = CRYPTO_ONCE_STATIC_INIT;
static unsigned once_run_count = 0;
@ -850,6 +1220,12 @@ int setup_tests(void)
ADD_TEST(test_multi_default);
ADD_TEST(test_lock);
#if defined(OPENSSL_THREADS)
ADD_TEST(torture_rw_low);
ADD_TEST(torture_rw_high);
ADD_TEST(torture_rcu_low);
ADD_TEST(torture_rcu_high);
#endif
ADD_TEST(test_once);
ADD_TEST(test_thread_local);
ADD_TEST(test_atomic);