diff --git a/.gitignore b/.gitignore index 91f4f528..7631db29 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ /.bundle/ -/lib/semian/*.so -/lib/semian/*.bundle +/lib/**/*.so +/lib/**/*.bundle /tmp/* *.gem /html/ diff --git a/ext/semian/extconf.rb b/ext/semian/extconf.rb index 597e1ff5..55f85424 100644 --- a/ext/semian/extconf.rb +++ b/ext/semian/extconf.rb @@ -23,7 +23,7 @@ have_func 'rb_thread_blocking_region' have_func 'rb_thread_call_without_gvl' -$CFLAGS = "-D_GNU_SOURCE -Werror -Wall " +$CFLAGS = "-D_GNU_SOURCE -Werror -Wall -std=c99 " if ENV.key?('DEBUG') $CFLAGS << "-O0 -g" else diff --git a/ext/semian/semian.c b/ext/semian/semian.c index 821872a4..ef888477 100644 --- a/ext/semian/semian.c +++ b/ext/semian/semian.c @@ -1,38 +1,6 @@ -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#include - -#include - -union semun { - int val; /* Value for SETVAL */ - struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ - unsigned short *array; /* Array for GETALL, SETALL */ - struct seminfo *__buf; /* Buffer for IPC_INFO - (Linux-specific) */ -}; - -#if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H) -// 2.0 -#include -#define WITHOUT_GVL(fn,a,ubf,b) rb_thread_call_without_gvl((fn),(a),(ubf),(b)) -#elif defined(HAVE_RB_THREAD_BLOCKING_REGION) - // 1.9 -typedef VALUE (*my_blocking_fn_t)(void*); -#define WITHOUT_GVL(fn,a,ubf,b) rb_thread_blocking_region((my_blocking_fn_t)(fn),(a),(ubf),(b)) -#endif +#include "semian.h" static ID id_timeout; -static VALUE eSyscall, eTimeout, eInternal; static int system_max_semaphore_count; static const int kIndexTickets = 0; @@ -48,7 +16,7 @@ typedef struct { char *name; } semian_resource_t; -static key_t +key_t generate_key(const char *name) { union { @@ -67,7 +35,7 @@ ms_to_timespec(long ms, struct timespec *ts) ts->tv_nsec = (ms % 1000) * 1000000; } -static void +void raise_semian_syscall_error(const char *syscall, int error_num) { rb_raise(eSyscall, "%s failed, errno: %d (%s)", syscall, error_num, strerror(error_num)); @@ -115,7 +83,7 @@ semian_resource_alloc(VALUE klass) return obj; } -static void +void set_semaphore_permissions(int sem_id, int permissions) { union semun sem_opts; @@ -447,6 +415,9 @@ semian_resource_id(VALUE self) return LONG2FIX(res->sem_id); } + +void Init_semian_shm_object(); + void Init_semian() { VALUE cSemian, cResource; @@ -504,4 +475,6 @@ void Init_semian() /* Maximum number of tickets available on this system. */ rb_define_const(cSemian, "MAX_TICKETS", INT2FIX(system_max_semaphore_count)); + + Init_semian_shm_object(); } diff --git a/ext/semian/semian.h b/ext/semian/semian.h new file mode 100644 index 00000000..d6eca09e --- /dev/null +++ b/ext/semian/semian.h @@ -0,0 +1,48 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include + +#include +#include + +#include + +union semun { + int val; /* Value for SETVAL */ + struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ + unsigned short *array; /* Array for GETALL, SETALL */ + struct seminfo *__buf; /* Buffer for IPC_INFO + (Linux-specific) */ +}; + +#if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H) +// 2.0 +#include +#define WITHOUT_GVL(fn,a,ubf,b) rb_thread_call_without_gvl((fn),(a),(ubf),(b)) +#elif defined(HAVE_RB_THREAD_BLOCKING_REGION) + // 1.9 +typedef VALUE (*my_blocking_fn_t)(void*); +#define WITHOUT_GVL(fn,a,ubf,b) rb_thread_blocking_region((my_blocking_fn_t)(fn),(a),(ubf),(b)) +#endif + +VALUE eSyscall, eTimeout, eInternal; + +key_t +generate_key(const char *name); + +void +raise_semian_syscall_error(const char *syscall, int error_num); + +void +set_semaphore_permissions(int sem_id, int permissions); diff --git a/ext/semian/semian_integer.c b/ext/semian/semian_integer.c new file mode 100644 index 00000000..d9d0ce9b --- /dev/null +++ b/ext/semian/semian_integer.c @@ -0,0 +1,105 @@ +#include "semian_shared_memory_object.h" + +typedef struct { + int value; +} semian_int; + +static void semian_integer_initialize_memory (size_t byte_size, void *dest, void *prev_data, size_t prev_data_byte_size); +static VALUE semian_integer_bind_initialize_memory_callback(VALUE self); +static VALUE semian_integer_get_value(VALUE self); +static VALUE semian_integer_set_value(VALUE self, VALUE num); +static VALUE semian_integer_increment(int argc, VALUE *argv, VALUE self); + +static void +semian_integer_initialize_memory (size_t byte_size, void *dest, void *prev_data, size_t prev_data_byte_size) +{ + semian_int *ptr = dest; + semian_int *old = prev_data; + if (prev_data){ + ptr->value = old->value; + } else { + ptr->value=0; + } +} + +static VALUE +semian_integer_bind_initialize_memory_callback(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + ptr->initialize_memory = &semian_integer_initialize_memory; + return self; +} + +static VALUE +semian_integer_get_value(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (0 == ptr->shm_address) + return Qnil; + int value = ((semian_int *)(ptr->shm_address))->value; + return INT2NUM(value); +} + +static VALUE +semian_integer_set_value(VALUE self, VALUE num) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (0 == ptr->shm_address) + return Qnil; + if (TYPE(num) != T_FIXNUM && TYPE(num) != T_FLOAT) + return Qnil; + ((semian_int *)(ptr->shm_address))->value = NUM2INT(num); + return num; +} + +static VALUE +semian_integer_reset(VALUE self) +{ + return semian_integer_set_value(self, INT2NUM(0)); +} + +static VALUE +semian_integer_increment(int argc, VALUE *argv, VALUE self) +{ + VALUE num; + rb_scan_args(argc, argv, "01", &num); + if (num == Qnil) + num = INT2NUM(1); + + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (0 == ptr->shm_address) + return Qnil; + if (TYPE(num) != T_FIXNUM && TYPE(num) != T_FLOAT) + return Qnil; + ((semian_int *)(ptr->shm_address))->value += NUM2INT(num); + return self; +} + +static VALUE +semian_integer_calculate_byte_size(VALUE klass) +{ + return SIZET2NUM(sizeof(int)); +} + +void +Init_semian_integer (void) +{ + // Bind methods to Integer + VALUE cSemianModule = rb_const_get(rb_cObject, rb_intern("Semian")); + VALUE cSysVSharedMemory = rb_const_get(cSemianModule, rb_intern("SysVSharedMemory")); + VALUE cSysVModule = rb_const_get(cSemianModule, rb_intern("SysV")); + VALUE cInteger = rb_const_get(cSysVModule, rb_intern("Integer")); + + semian_shm_object_replace_alloc(cSysVSharedMemory, cInteger); + + rb_define_private_method(cInteger, "bind_initialize_memory_callback", semian_integer_bind_initialize_memory_callback, 0); + define_method_with_synchronize(cInteger, "value", semian_integer_get_value, 0); + define_method_with_synchronize(cInteger, "value=", semian_integer_set_value, 1); + define_method_with_synchronize(cInteger, "reset", semian_integer_reset, 0); + define_method_with_synchronize(cInteger, "increment", semian_integer_increment, -1); + rb_define_method(cInteger, "calculate_byte_size", semian_integer_calculate_byte_size, 0); +} diff --git a/ext/semian/semian_shared_memory_object.c b/ext/semian/semian_shared_memory_object.c new file mode 100644 index 00000000..f67b8ab1 --- /dev/null +++ b/ext/semian/semian_shared_memory_object.c @@ -0,0 +1,420 @@ +#include "semian_shared_memory_object.h" + +const int kSHMSemaphoreCount = 1; // semaphores to be acquired +const int kSHMTicketMax = 1; +const int kSHMInitializeWaitTimeout = 5; /* seconds */ +const int kSHMIndexTicketLock = 0; +const int kSHMInternalTimeout = 5; /* seconds */ +const int kSHMRestoreLockStateRetryCount = 5; // perform semtimedop 5 times max + +static struct sembuf decrement; // = { kSHMIndexTicketLock, -1, SEM_UNDO}; +static struct sembuf increment; // = { kSHMIndexTicketLock, 1, SEM_UNDO}; + +/* + * Functions that handle type and memory +*/ +static void semian_shm_object_mark(void *ptr); +static void semian_shm_object_free(void *ptr); +static size_t semian_shm_object_memsize(const void *ptr); + +const rb_data_type_t +semian_shm_object_type = { + "semian_shm_object", + { + semian_shm_object_mark, + semian_shm_object_free, + semian_shm_object_memsize + }, + NULL, NULL, RUBY_TYPED_FREE_IMMEDIATELY +}; + +static void +semian_shm_object_mark(void *ptr) +{ + /* noop */ +} +static void +semian_shm_object_free(void *ptr) +{ + semian_shm_object *data = (semian_shm_object *)ptr; + // Under normal circumstances, memory use should be in the order of bytes, and shouldn't + // increase if the same key/id is used, so there is no need to delete the shared memory + // (also raises a concurrency-related bug: "object allocation during garbage collection phase") + xfree(data); +} +static size_t +semian_shm_object_memsize(const void *ptr) +{ + return sizeof(semian_shm_object); +} +static VALUE +semian_shm_object_alloc(VALUE klass) +{ + VALUE obj; + semian_shm_object *ptr; + obj = TypedData_Make_Struct(klass, semian_shm_object, &semian_shm_object_type, ptr); + return obj; +} + +/* + * Implementations + */ + +VALUE +semian_shm_object_replace_alloc(VALUE klass, VALUE target) +{ + rb_define_alloc_func(target, semian_shm_object_alloc); + return target; +} + +VALUE +semian_shm_object_acquire(VALUE self, VALUE name, VALUE byte_size, VALUE permissions) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + + if (TYPE(name) != T_SYMBOL && TYPE(name) != T_STRING) + rb_raise(rb_eTypeError, "id must be a symbol or string"); + if (TYPE(byte_size) != T_FIXNUM) + rb_raise(rb_eTypeError, "expected integer for byte_size"); + if (TYPE(permissions) != T_FIXNUM) + rb_raise(rb_eTypeError, "expected integer for permissions"); + + if (NUM2SIZET(byte_size) <= 0) + rb_raise(rb_eArgError, "total size must be larger than 0"); + + const char *id_str = NULL; + if (TYPE(name) == T_SYMBOL) { + id_str = rb_id2name(rb_to_id(name)); + } else if (TYPE(name) == T_STRING) { + id_str = RSTRING_PTR(name); + } + ptr->key = generate_key(id_str); + ptr->byte_size = NUM2SIZET(byte_size); // byte_size >=1 or error would have been raised earlier + ptr->semid = -1; // id's default to -1 + ptr->shmid = -1; + ptr->shm_address = 0; // address defaults to NULL + ptr->lock_count = 0; // Emulates recursive mutex, 0->1 locks, 1->0 unlocks, rest noops + ptr->permissions = FIX2LONG(permissions); + ptr->initialize_memory = NULL; + + // Concrete classes must implement this in a subclass in C to bind a callback function of type + // void (*initialize_memory)(size_t byte_size, void *dest, void *prev_data, size_t prev_data_byte_size); + // to location ptr->initialize_memory, where ptr is a semian_shm_object* + // It is called when memory needs to be initialized or resized, possibly using previous memory + rb_funcall(self, rb_intern("bind_initialize_memory_callback"), 0); + if (NULL == ptr->initialize_memory) + rb_raise(rb_eNotImpError, "callback was not bound to ptr->initialize_memory"); + semian_shm_object_acquire_semaphore(self); + semian_shm_object_synchronize(self); + + return Qtrue; +} + +VALUE +semian_shm_object_destroy(VALUE self) +{ + VALUE result = semian_shm_object_cleanup_memory(self); + if (!result) + return Qfalse; + result = semian_shm_object_delete_semaphore(self); + return result; +} + +/* + * Create or acquire previously made semaphore + */ + +static int +create_semaphore_and_initialize_and_set_permissions(int key, int permissions) +{ + int semid = 0; + int flags = 0; + + flags = IPC_EXCL | IPC_CREAT | permissions; + + semid = semget(key, kSHMSemaphoreCount, flags); + if (semid >= 0) { + if (-1 == semctl(semid, 0, SETVAL, kSHMTicketMax)) { + rb_warn("semctl: failed to set semaphore with semid %d, position 0 to %d", semid, 1); + raise_semian_syscall_error("semctl()", errno); + } + } else if (semid == -1 && errno == EEXIST) { + flags &= ~IPC_EXCL; + semid = semget(key, kSHMSemaphoreCount, flags); + } + + if (-1 != semid){ + set_semaphore_permissions(semid, permissions); // Borrowed from semian.c + } + + return semid; +} + + +VALUE +semian_shm_object_acquire_semaphore (VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + + if (-1 == (ptr->semid = create_semaphore_and_initialize_and_set_permissions(ptr->key, ptr->permissions))) { + raise_semian_syscall_error("semget()", errno); + } + return self; +} + +VALUE +semian_shm_object_delete_semaphore(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (-1 == ptr->semid){ // do nothing if semaphore not acquired + return Qfalse; + } + if (-1 == semctl(ptr->semid, 0, IPC_RMID)) { + if (EIDRM == errno) { + rb_warn("semctl: failed to delete semaphore set with semid %d: already removed", ptr->semid); + raise_semian_syscall_error("semctl()", errno); + ptr->semid = -1; + } else { + rb_warn("semctl: failed to remove semaphore with semid %d, errno %d (%s)", ptr->semid, errno, strerror(errno)); + raise_semian_syscall_error("semctl()", errno); + } + } else { + ptr->semid = -1; + } + return self; +} + +/* + * lock & unlock functions, should be called like + * (VALUE) WITHOUT_GVL(semian_shm_object_unlock_without_gvl, (void *)ptr, RUBY_UBF_IO, NULL) + */ + +static void * +semian_shm_object_lock_without_gvl(void *v_ptr) +{ + semian_shm_object *ptr = v_ptr; + if (-1 == ptr->semid) { + rb_raise(eInternal, "semid not set, errno %d: (%s)", errno, strerror(errno)); + } + struct timespec ts = { 0 }; + ts.tv_sec = kSHMInternalTimeout; + if (0 != ptr->lock_count || -1 != semtimedop(ptr->semid, &decrement, 1, &ts)) { + ptr->lock_count += 1; + } else { + rb_raise(eInternal, "error acquiring semaphore lock to mutate circuit breaker structure, %d: (%s)", errno, strerror(errno)); + } + return (void *)Qtrue; +} + +static void * +semian_shm_object_unlock_without_gvl(void *v_ptr) +{ + semian_shm_object *ptr = v_ptr; + if (-1 == ptr->semid){ + rb_raise(eInternal, "semid not set, errno %d: (%s)", errno, strerror(errno)); + } + if (1 != ptr->lock_count || -1 != semop(ptr->semid, &increment, 1)) { // No need for semtimedop + ptr->lock_count -= 1; + } else { + rb_raise(eInternal, "error unlocking semaphore, %d (%s)", errno, strerror(errno)); + } + return (void *)Qtrue; +} + +/* + * Wrap the lock-unlock functionality in ensures + */ + +typedef struct { // Workaround rb_ensure only allows one argument for each callback function + int pre_block_lock_count_state; + semian_shm_object *ptr; +} lock_status; + +static VALUE +semian_shm_object_synchronize_with_block(VALUE self) +{ + semian_shm_object_synchronize_memory_and_size(self, Qfalse); + if (!rb_block_given_p()) + return Qnil; + return rb_yield(Qnil); +} + +static VALUE +semian_shm_object_synchronize_restore_lock_status(VALUE v_status) +{ + lock_status *status = (lock_status *) v_status; + int tries = 0; + while (++tries < kSHMRestoreLockStateRetryCount && status->ptr->lock_count > status->pre_block_lock_count_state) + return (VALUE) WITHOUT_GVL(semian_shm_object_unlock_without_gvl, (void *)(status->ptr), RUBY_UBF_IO, NULL); + if (tries >= kSHMRestoreLockStateRetryCount) + rb_raise(eSyscall, "Failed to restore lock status after %d tries", kSHMRestoreLockStateRetryCount); + tries = 0; + while (++tries < kSHMRestoreLockStateRetryCount && status->ptr->lock_count < status->pre_block_lock_count_state) + return (VALUE) WITHOUT_GVL(semian_shm_object_lock_without_gvl, (void *)(status->ptr), RUBY_UBF_IO, NULL); + if (tries >= kSHMRestoreLockStateRetryCount) + rb_raise(eSyscall, "Failed to restore lock status after %d tries", kSHMRestoreLockStateRetryCount); + return Qnil; +} + +VALUE +semian_shm_object_synchronize(VALUE self) { // receives a block + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + lock_status status = { ptr->lock_count, ptr }; + WITHOUT_GVL(semian_shm_object_lock_without_gvl, (void *)ptr, RUBY_UBF_IO, NULL); + return rb_ensure(semian_shm_object_synchronize_with_block, self, semian_shm_object_synchronize_restore_lock_status, (VALUE)&status); +} + +void +define_method_with_synchronize(VALUE klass, const char *name, VALUE (*func)(ANYARGS), int argc) +{ + rb_define_method(klass, name, func, argc); + rb_funcall(klass, rb_intern("do_with_sync"), 1, rb_str_new2(name)); +} + +/* + * Memory functions + */ + +VALUE +semian_shm_object_synchronize_memory_and_size(VALUE self, VALUE is_master_obj) { + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + + struct shmid_ds shm_info = { }; + const int SHMMIN = 1; // minimum size of shared memory on linux + key_t key = ptr->key; + + int is_master = RTEST(is_master_obj); // Controls whether synchronization is master or slave (both fast-forward, only master resizes/initializes) + is_master |= (-1 == ptr->shmid) && (0 == ptr->shm_address); + + int shmid_out_of_sync = 0; + shmid_out_of_sync |= (-1 == ptr->shmid) && (0 == ptr->shm_address); // If not attached at all + if ((-1 != ptr->shmid) && (-1 != shmctl(ptr->shmid, IPC_STAT, &shm_info))) { + shmid_out_of_sync |= shm_info.shm_perm.mode & SHM_DEST && // If current attached memory is marked for deletion + ptr->shmid != shmget(key, SHMMIN, IPC_CREAT | ptr->permissions); // If shmid not in sync + } + + size_t requested_byte_size = ptr->byte_size; + int first_sync = (-1 == ptr->shmid) && (shmid_out_of_sync); + + if (shmid_out_of_sync) { // We need to fast-forward to the current state and memory attachment + semian_shm_object_cleanup_memory(self); + if ((-1 == (ptr->shmid = shmget(key, SHMMIN, ptr->permissions)))) { + if ((-1 == (ptr->shmid = shmget(key, ptr->byte_size, IPC_CREAT | IPC_EXCL | ptr->permissions)))) { + rb_raise(eSyscall, "shmget failed to create or attach current memory with key %d shmid %d errno %d (%s)", key, ptr->shmid, errno, strerror(errno)); + } // If we can neither create a new memory block nor get the current one with a key, something's wrong + } + if ((void *)-1 == (ptr->shm_address = shmat(ptr->shmid, NULL, 0))) { + ptr->shm_address = NULL; + rb_raise(eSyscall, "shmat failed to mount current memory with key %d shmid %d errno %d (%s)", key, ptr->shmid, errno, strerror(errno)); + } + } + + if (-1 == shmctl(ptr->shmid, IPC_STAT, &shm_info)){ + rb_raise(eSyscall, "shmctl failed to inspect current memory with key %d shmid %d errno %d (%s)", key, ptr->shmid, errno, strerror(errno)); + } + ptr->byte_size = shm_info.shm_segsz; + + int old_mem_attach_count = shm_info.shm_nattch; + + if (is_master) { + if (ptr->byte_size == requested_byte_size && first_sync && 1 == old_mem_attach_count) { + ptr->initialize_memory(ptr->byte_size, ptr->shm_address, NULL, 0); // We clear the memory if worker is first to attach + } else if (ptr->byte_size != requested_byte_size) { + void *old_shm_address = ptr->shm_address; + size_t old_byte_size = ptr->byte_size; + unsigned char old_memory_content[old_byte_size]; // It is unsafe to use malloc here to store a copy of the memory + memcpy(old_memory_content, old_shm_address, old_byte_size); + semian_shm_object_cleanup_memory(self); // This may fail + + if (-1 == (ptr->shmid = shmget(key, requested_byte_size, IPC_CREAT | IPC_EXCL | ptr->permissions))) { + rb_raise(eSyscall, "shmget failed to create new resized memory with key %d shmid %d errno %d (%s)", key, ptr->shmid, errno, strerror(errno)); + } + if ((void *)-1 == (ptr->shm_address = shmat(ptr->shmid, NULL, 0))) { + rb_raise(eSyscall, "shmat failed to mount new resized memory with key %d shmid %d errno %d (%s)", key, ptr->shmid, errno, strerror(errno)); + } + ptr->byte_size = requested_byte_size; + + ptr->initialize_memory(ptr->byte_size, ptr->shm_address, old_memory_content, old_byte_size); + } + } + return self; +} + +static VALUE +semian_shm_object_cleanup_memory_inner(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + + if (0 != ptr->shm_address && -1 == shmdt(ptr->shm_address)) { + rb_raise(eSyscall,"shmdt: no attached memory at %p, errno %d (%s)", ptr->shm_address, errno, strerror(errno)); + } + ptr->shm_address = 0; + + if (-1 != ptr->shmid && -1 == shmctl(ptr->shmid, IPC_RMID, 0)) { + if (errno != EINVAL) + rb_raise(eSyscall,"shmctl: error flagging memory for removal with shmid %d, errno %d (%s)", ptr->shmid, errno, strerror(errno)); + } + ptr->shmid = -1; + return Qnil; +} + +VALUE +semian_shm_object_cleanup_memory(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + lock_status status = { ptr->lock_count, ptr }; + WITHOUT_GVL(semian_shm_object_lock_without_gvl, (void *)ptr, RUBY_UBF_IO, NULL); + return rb_ensure(semian_shm_object_cleanup_memory_inner, self, semian_shm_object_synchronize_restore_lock_status, (VALUE)&status); +} + +static VALUE +semian_shm_object_semid(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (-1 == ptr->semid) + return -1; + semian_shm_object_synchronize(self); + return INT2NUM(ptr->semid); +} +static VALUE +semian_shm_object_shmid(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + return INT2NUM(ptr->shmid); +} + +void +Init_semian_shm_object (void) { + + VALUE cSemianModule = rb_const_get(rb_cObject, rb_intern("Semian")); + VALUE cSysVSharedMemory = rb_const_get(cSemianModule, rb_intern("SysVSharedMemory")); + + rb_define_method(cSysVSharedMemory, "acquire_memory_object", semian_shm_object_acquire, 3); + rb_define_method(cSysVSharedMemory, "destroy", semian_shm_object_destroy, 0); + rb_define_method(cSysVSharedMemory, "synchronize", semian_shm_object_synchronize, 0); + + rb_define_method(cSysVSharedMemory, "semid", semian_shm_object_semid, 0); + define_method_with_synchronize(cSysVSharedMemory, "shmid", semian_shm_object_shmid, 0); + + rb_define_singleton_method(cSysVSharedMemory, "replace_alloc", semian_shm_object_replace_alloc, 1); + + decrement.sem_num = kSHMIndexTicketLock; + decrement.sem_op = -1; + decrement.sem_flg = SEM_UNDO; + + increment.sem_num = kSHMIndexTicketLock; + increment.sem_op = 1; + increment.sem_flg = SEM_UNDO; + + Init_semian_integer(); + Init_semian_sliding_window(); +} diff --git a/ext/semian/semian_shared_memory_object.h b/ext/semian/semian_shared_memory_object.h new file mode 100644 index 00000000..7639a9a6 --- /dev/null +++ b/ext/semian/semian_shared_memory_object.h @@ -0,0 +1,36 @@ +#include "semian.h" + +typedef struct { + //semaphore, shared memory data and pointer + key_t key; + size_t byte_size; + int lock_count; // lock only done from 0 -> 1, unlock only done from 1 -> 0, so we can 'lock' multiple times (such as in nesting functions) without actually locking + int permissions; + int semid; + int shmid; + void (*initialize_memory)(size_t byte_size, void *dest, void *prev_data, size_t prev_data_byte_size); + void *shm_address; +} semian_shm_object; + +extern const rb_data_type_t +semian_shm_object_type; + +/* + * Headers + */ + +VALUE semian_shm_object_sizeof(VALUE klass, VALUE type); +VALUE semian_shm_object_replace_alloc(VALUE klass, VALUE target); + +VALUE semian_shm_object_acquire(VALUE self, VALUE id, VALUE byte_size, VALUE permissions); +VALUE semian_shm_object_destroy(VALUE self); +VALUE semian_shm_object_acquire_semaphore (VALUE self); +VALUE semian_shm_object_delete_semaphore(VALUE self); +VALUE semian_shm_object_cleanup_memory (VALUE self); +VALUE semian_shm_object_synchronize_memory_and_size (VALUE self, VALUE is_master); + +VALUE semian_shm_object_synchronize(VALUE self); +void define_method_with_synchronize(VALUE klass, const char *name, VALUE (*func)(ANYARGS), int argc); + +void Init_semian_integer (void); +void Init_semian_sliding_window (void); diff --git a/ext/semian/semian_sliding_window.c b/ext/semian/semian_sliding_window.c new file mode 100644 index 00000000..2162d86e --- /dev/null +++ b/ext/semian/semian_sliding_window.c @@ -0,0 +1,257 @@ +#include "semian_shared_memory_object.h" + +typedef struct { + int max_window_size; + int window_size; + long window[]; +} semian_sliding_window; + +static void semian_sliding_window_initialize_memory (size_t byte_size, void *dest, void *prev_data, size_t prev_data_byte_size); +static VALUE semian_sliding_window_bind_initialize_memory_callback(VALUE self); +static VALUE semian_sliding_window_size(VALUE self); +static VALUE semian_sliding_window_max_size(VALUE self); +static VALUE semian_sliding_window_push_back(VALUE self, VALUE num); +static VALUE semian_sliding_window_pop_back(VALUE self); +static VALUE semian_sliding_window_push_front(VALUE self, VALUE num); +static VALUE semian_sliding_window_pop_front(VALUE self); +static VALUE semian_sliding_window_clear(VALUE self); +static VALUE semian_sliding_window_first(VALUE self); +static VALUE semian_sliding_window_last(VALUE self); +static VALUE semian_sliding_window_resize_to(VALUE self, VALUE size); + +static void +semian_sliding_window_initialize_memory (size_t byte_size, void *dest, void *prev_data, size_t prev_data_byte_size) +{ + semian_sliding_window *ptr = dest; + semian_sliding_window *old = prev_data; + + if (prev_data) { + ptr->max_window_size = (byte_size - 2 * sizeof(int)) / sizeof(long); + ptr->window_size = fmin(ptr->max_window_size, old->window_size); + + // Copy the most recent ptr->shm_address->window_size numbers to new memory + memcpy(&(ptr->window), + ((long *)(&(old->window[0]))) + old->window_size - ptr->window_size, + ptr->window_size * sizeof(long)); + } else { + semian_sliding_window *data = dest; + data->max_window_size = (byte_size - 2 * sizeof(int)) / sizeof(long); + data->window_size = 0; + for (int i = 0; i < data->window_size; ++i) + data->window[i] = 0; + } +} + +static VALUE +semian_sliding_window_bind_initialize_memory_callback(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + ptr->initialize_memory = &semian_sliding_window_initialize_memory; + return self; +} + +static VALUE +semian_sliding_window_size(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (0 == ptr->shm_address) + return Qnil; + int window_size = ((semian_sliding_window *)(ptr->shm_address))->window_size; + return INT2NUM(window_size); +} + +static VALUE +semian_sliding_window_max_size(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (0 == ptr->shm_address) + return Qnil; + int max_length = ((semian_sliding_window *)(ptr->shm_address))->max_window_size; + return INT2NUM(max_length); +} + +static VALUE +semian_sliding_window_push_back(VALUE self, VALUE num) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (0 == ptr->shm_address) + return Qnil; + if (TYPE(num) != T_FIXNUM && TYPE(num) != T_FLOAT && TYPE(num) != T_BIGNUM) + return Qnil; + + semian_sliding_window *data = ptr->shm_address; + if (data->window_size == data->max_window_size) { + for (int i = 1; i < data->max_window_size; ++i){ + data->window[i - 1] = data->window[i]; + } + --(data->window_size); + } + data->window[(data->window_size)] = NUM2LONG(num); + ++(data->window_size); + return self; +} + +static VALUE +semian_sliding_window_pop_back(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (0 == ptr->shm_address) + return Qnil; + + VALUE retval; + semian_sliding_window *data = ptr->shm_address; + if (0 == data->window_size) + retval = Qnil; + else { + retval = LONG2NUM(data->window[data->window_size - 1]); + --(data->window_size); + } + return retval; +} + +static VALUE +semian_sliding_window_push_front(VALUE self, VALUE num) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (0 == ptr->shm_address) + return Qnil; + if (TYPE(num) != T_FIXNUM && TYPE(num) != T_FLOAT && TYPE(num) != T_BIGNUM) + return Qnil; + + long val = NUM2LONG(num); + semian_sliding_window *data = ptr->shm_address; + + for (int i=data->window_size; i > 0; --i) + data->window[i] = data->window[i - 1]; + + data->window[0] = val; + ++(data->window_size); + if (data->window_size > data->max_window_size) + data->window_size=data->max_window_size; + + return self; +} + +static VALUE +semian_sliding_window_pop_front(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (0 == ptr->shm_address) + return Qnil; + + VALUE retval; + semian_sliding_window *data = ptr->shm_address; + if (0 >= data->window_size) + retval = Qnil; + else { + retval = LONG2NUM(data->window[0]); + for (int i = 0; i < data->window_size - 1; ++i) + data->window[i] = data->window[i + 1]; + --(data->window_size); + } + + return retval; +} + +static VALUE +semian_sliding_window_clear(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (0 == ptr->shm_address) + return Qnil; + semian_sliding_window *data = ptr->shm_address; + data->window_size = 0; + + return self; +} + +static VALUE +semian_sliding_window_first(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (0 == ptr->shm_address) + return Qnil; + + VALUE retval; + semian_sliding_window *data = ptr->shm_address; + if (data->window_size >= 1) + retval = LONG2NUM(data->window[0]); + else + retval = Qnil; + + return retval; +} + +static VALUE +semian_sliding_window_last(VALUE self) +{ + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + if (0 == ptr->shm_address) + return Qnil; + + VALUE retval; + semian_sliding_window *data = ptr->shm_address; + if (data->window_size > 0) + retval = LONG2NUM(data->window[data->window_size - 1]); + else + retval = Qnil; + + return retval; +} + +static VALUE +semian_sliding_window_resize_to(VALUE self, VALUE size) { + semian_shm_object *ptr; + TypedData_Get_Struct(self, semian_shm_object, &semian_shm_object_type, ptr); + + if (TYPE(size) != T_FIXNUM && TYPE(size) != T_FLOAT) + return Qnil; + if (NUM2INT(size) <= 0) + rb_raise(rb_eArgError, "size must be larger than 0"); + + ptr->byte_size = 2 * sizeof(int) + NUM2INT(size) * sizeof(long); + semian_shm_object_synchronize_memory_and_size(self, Qtrue); + + return self; +} + +static VALUE +semian_sliding_window_calculate_byte_size(VALUE klass, VALUE size) +{ + return INT2NUM(2 * sizeof(int) + NUM2INT(size) * sizeof(long)); +} + +void +Init_semian_sliding_window (void) +{ + VALUE cSemianModule = rb_const_get(rb_cObject, rb_intern("Semian")); + VALUE cSysVModule = rb_const_get(cSemianModule, rb_intern("SysV")); + VALUE cSysVSharedMemory = rb_const_get(cSemianModule, rb_intern("SysVSharedMemory")); + VALUE cSlidingWindow = rb_const_get(cSysVModule, rb_intern("SlidingWindow")); + + semian_shm_object_replace_alloc(cSysVSharedMemory, cSlidingWindow); + + rb_define_private_method(cSlidingWindow, "bind_initialize_memory_callback", semian_sliding_window_bind_initialize_memory_callback, 0); + define_method_with_synchronize(cSlidingWindow, "size", semian_sliding_window_size, 0); + define_method_with_synchronize(cSlidingWindow, "max_size", semian_sliding_window_max_size, 0); + define_method_with_synchronize(cSlidingWindow, "resize_to", semian_sliding_window_resize_to, 1); + define_method_with_synchronize(cSlidingWindow, "<<", semian_sliding_window_push_back, 1); + define_method_with_synchronize(cSlidingWindow, "push", semian_sliding_window_push_back, 1); + define_method_with_synchronize(cSlidingWindow, "pop", semian_sliding_window_pop_back, 0); + define_method_with_synchronize(cSlidingWindow, "shift", semian_sliding_window_pop_front, 0); + define_method_with_synchronize(cSlidingWindow, "unshift", semian_sliding_window_push_front, 1); + define_method_with_synchronize(cSlidingWindow, "clear", semian_sliding_window_clear, 0); + define_method_with_synchronize(cSlidingWindow, "first", semian_sliding_window_first, 0); + define_method_with_synchronize(cSlidingWindow, "last", semian_sliding_window_last, 0); + rb_define_method(cSlidingWindow, "calculate_byte_size", semian_sliding_window_calculate_byte_size, 1); +} diff --git a/lib/semian.rb b/lib/semian.rb index 7c3255da..a8865131 100644 --- a/lib/semian.rb +++ b/lib/semian.rb @@ -78,7 +78,7 @@ module Semian OpenCircuitError = Class.new(BaseError) def semaphores_enabled? - !ENV['SEMIAN_SEMAPHORES_DISABLED'] + !ENV['SEMIAN_SEMAPHORES_DISABLED'] && Semian.sysv_semaphores_supported? end module AdapterError @@ -119,10 +119,13 @@ def to_s # Returns the registered resource. def register(name, tickets:, permissions: 0660, timeout: 0, error_threshold:, error_timeout:, success_threshold:, exceptions: []) circuit_breaker = CircuitBreaker.new( + name, success_threshold: success_threshold, error_threshold: error_threshold, error_timeout: error_timeout, exceptions: Array(exceptions) + [::Semian::BaseError], + permissions: permissions, + implementation: Semian.semaphores_enabled? ? ::Semian::SysV : ::Semian::Simple, ) resource = Resource.new(name, tickets: tickets, permissions: permissions, timeout: timeout) resources[name] = ProtectedResource.new(resource, circuit_breaker) @@ -154,7 +157,14 @@ def resources require 'semian/protected_resource' require 'semian/unprotected_resource' require 'semian/platform' -if Semian.sysv_semaphores_supported? && Semian.semaphores_enabled? +require 'semian/simple_sliding_window' +require 'semian/simple_integer' +require 'semian/simple_state' +if Semian.semaphores_enabled? + require 'semian/sysv_shared_memory' + require 'semian/sysv_sliding_window' + require 'semian/sysv_integer' + require 'semian/sysv_state' require 'semian/semian' else Semian::MAX_TICKETS = 0 @@ -162,7 +172,7 @@ def resources Semian.logger.info("Semian sysv semaphores are not supported on #{RUBY_PLATFORM} - all operations will no-op") end - unless Semian.semaphores_enabled? + if ENV['SEMIAN_SEMAPHORES_DISABLED'] Semian.logger.info("Semian semaphores are disabled, is this what you really want? - all operations will no-op") end end diff --git a/lib/semian/circuit_breaker.rb b/lib/semian/circuit_breaker.rb index cc78b6ce..cb0c8581 100644 --- a/lib/semian/circuit_breaker.rb +++ b/lib/semian/circuit_breaker.rb @@ -1,13 +1,21 @@ module Semian - class CircuitBreaker - attr_reader :state + class CircuitBreaker #:nodoc: + extend Forwardable - def initialize(exceptions:, success_threshold:, error_threshold:, error_timeout:) + def initialize(name, exceptions:, success_threshold:, error_threshold:, error_timeout:, permissions:, implementation:) + @name = name.to_sym @success_count_threshold = success_threshold @error_count_threshold = error_threshold @error_timeout = error_timeout @exceptions = exceptions - reset + + @errors = implementation::SlidingWindow.new(max_size: @error_count_threshold, + name: "#{name}_sliding_window", + permissions: permissions) + @successes = implementation::Integer.new(name: "#{name}_integer", + permissions: permissions) + @state = implementation::State.new(name: "#{name}_state", + permissions: permissions) end def acquire @@ -32,8 +40,7 @@ def request_allowed? end def mark_failed(_error) - push_time(@errors, @error_count_threshold, duration: @error_timeout) - + push_time(@errors, duration: @error_timeout) if closed? open if error_threshold_reached? elsif half_open? @@ -43,70 +50,70 @@ def mark_failed(_error) def mark_success return unless half_open? - @successes += 1 + @successes.increment close if success_threshold_reached? end def reset - @errors = [] - @successes = 0 + @errors.clear + @successes.reset close end + def destroy + @errors.destroy + @successes.destroy + @state.destroy + end + private - def closed? - state == :closed - end + def_delegators :@state, :closed?, :open?, :half_open? + private :closed?, :open?, :half_open? def close log_state_transition(:closed) - @state = :closed - @errors = [] - end - - def open? - state == :open + @state.close + @errors.clear end def open log_state_transition(:open) - @state = :open - end - - def half_open? - state == :half_open + @state.open end def half_open log_state_transition(:half_open) - @state = :half_open - @successes = 0 + @state.half_open + @successes.reset end def success_threshold_reached? - @successes >= @success_count_threshold + @successes.value >= @success_count_threshold end def error_threshold_reached? - @errors.count == @error_count_threshold + @errors.size == @error_count_threshold end def error_timeout_expired? - @errors.last && (@errors.last + @error_timeout < Time.now) + time_ms = @errors.last + time_ms && (Time.at(time_ms / 1000) + @error_timeout < Time.now) end - def push_time(window, max_size, duration:, time: Time.now) - window.shift while window.first && window.first + duration < time - window.shift if window.size == max_size - window << time + def push_time(window, duration:, time: Time.now) + # The sliding window stores the integer amount of milliseconds since epoch as a timestamp + @errors.synchronize do + window.shift while window.first && window.first / 1000 + duration < time.to_i + window << (time.to_f * 1000).to_i + end end def log_state_transition(new_state) - return if @state.nil? || new_state == @state + return if @state.nil? || new_state == @state.value - str = "[#{self.class.name}] State transition from #{@state} to #{new_state}." - str << " success_count=#{@successes} error_count=#{@errors.count}" + str = "[#{self.class.name}] State transition from #{@state.value} to #{new_state}." + str << " success_count=#{@successes.value} error_count=#{@errors.size}" str << " success_count_threshold=#{@success_count_threshold} error_count_threshold=#{@error_count_threshold}" str << " error_timeout=#{@error_timeout} error_last_at=\"#{@error_last_at}\"" Semian.logger.info(str) diff --git a/lib/semian/protected_resource.rb b/lib/semian/protected_resource.rb index 24a0e2f9..71ac8a6f 100644 --- a/lib/semian/protected_resource.rb +++ b/lib/semian/protected_resource.rb @@ -5,13 +5,18 @@ class ProtectedResource extend Forwardable def_delegators :@resource, :destroy, :count, :semid, :tickets, :name - def_delegators :@circuit_breaker, :reset, :mark_failed, :request_allowed? + def_delegators :@circuit_breaker, :reset, :mark_failed, :mark_success, :request_allowed? def initialize(resource, circuit_breaker) @resource = resource @circuit_breaker = circuit_breaker end + def destroy + @resource.destroy + @circuit_breaker.destroy + end + def acquire(timeout: nil, scope: nil, adapter: nil) @circuit_breaker.acquire do begin diff --git a/lib/semian/simple_integer.rb b/lib/semian/simple_integer.rb new file mode 100644 index 00000000..cb4403e9 --- /dev/null +++ b/lib/semian/simple_integer.rb @@ -0,0 +1,23 @@ +module Semian + module Simple + class Integer #:nodoc: + attr_accessor :value + + def initialize(**) + reset + end + + def increment(val = 1) + @value += val + end + + def reset + @value = 0 + end + + def destroy + reset + end + end + end +end diff --git a/lib/semian/simple_sliding_window.rb b/lib/semian/simple_sliding_window.rb new file mode 100644 index 00000000..bfd57e54 --- /dev/null +++ b/lib/semian/simple_sliding_window.rb @@ -0,0 +1,45 @@ +require 'forwardable' + +module Semian + module Simple + class SlidingWindow #:nodoc: + extend Forwardable + + def_delegators :@window, :size, :pop, :shift, :first, :last + attr_reader :max_size + + # A sliding window is a structure that stores the most @max_size recent timestamps + # like this: if @max_size = 4, current time is 10, @window =[5,7,9,10]. + # Another push of (11) at 11 sec would make @window [7,9,10,11], shifting off 5. + + def initialize(max_size:, **) + @max_size = max_size + @window = [] + end + + def resize_to(size) + raise ArgumentError.new('size must be larger than 0') if size < 1 + @max_size = size + @window.shift while @window.size > @max_size + self + end + + def push(value) + @window.shift while @window.size >= @max_size + @window << value + self + end + + alias_method :<<, :push + + def clear + @window.clear + self + end + + def destroy + clear + end + end + end +end diff --git a/lib/semian/simple_state.rb b/lib/semian/simple_state.rb new file mode 100644 index 00000000..eff76a98 --- /dev/null +++ b/lib/semian/simple_state.rb @@ -0,0 +1,44 @@ +module Semian + module Simple + class State #:nodoc: + def initialize(**) + reset + end + + attr_accessor :value + private :value= + + def open? + value == :open + end + + def closed? + value == :closed + end + + def half_open? + value == :half_open + end + + def open + self.value = :open + end + + def close + self.value = :closed + end + + def half_open + self.value = :half_open + end + + def reset + close + end + + def destroy + reset + end + end + end +end diff --git a/lib/semian/sysv_integer.rb b/lib/semian/sysv_integer.rb new file mode 100644 index 00000000..15a4b7eb --- /dev/null +++ b/lib/semian/sysv_integer.rb @@ -0,0 +1,11 @@ +module Semian + module SysV + class Integer < Semian::Simple::Integer #:nodoc: + include SysVSharedMemory + + def initialize(name:, permissions:) + acquire_memory_object(name, calculate_byte_size, permissions) + end + end + end +end diff --git a/lib/semian/sysv_shared_memory.rb b/lib/semian/sysv_shared_memory.rb new file mode 100644 index 00000000..dbbbbae1 --- /dev/null +++ b/lib/semian/sysv_shared_memory.rb @@ -0,0 +1,53 @@ +module Semian + module SysVSharedMemory #:nodoc: + module SysVSynchronizeHelper + # This is a helper method for wrapping a method in :synchronize + # Its usage is to be called from C: where rb_define_method() is originally + # used, define_method_with_synchronize() is used instead, which calls this + def do_with_sync(*names) + names.each do |name| + new_name = "#{name}_inner" + alias_method new_name, name + private new_name + define_method(name) do |*args, &block| + synchronize do + method(new_name).call(*args, &block) + end + end + end + end + end + + extend SysVSynchronizeHelper + + def self.included(base) + base.extend(SysVSynchronizeHelper) + end + + def semid + -1 + end + + def shmid + -1 + end + + def synchronize + yield if block_given? + end + + def destroy + super + end + + private + + def acquire_memory_object(*) + raise NotImplementedError + end + + def bind_initialize_memory_callback + raise NotImplementedError + end + end +end diff --git a/lib/semian/sysv_sliding_window.rb b/lib/semian/sysv_sliding_window.rb new file mode 100644 index 00000000..b5cc1187 --- /dev/null +++ b/lib/semian/sysv_sliding_window.rb @@ -0,0 +1,11 @@ +module Semian + module SysV + class SlidingWindow < Semian::Simple::SlidingWindow #:nodoc: + include SysVSharedMemory + + def initialize(max_size:, name:, permissions:) + acquire_memory_object(name, calculate_byte_size(max_size), permissions) + end + end + end +end diff --git a/lib/semian/sysv_state.rb b/lib/semian/sysv_state.rb new file mode 100644 index 00000000..fc338a9a --- /dev/null +++ b/lib/semian/sysv_state.rb @@ -0,0 +1,31 @@ +require 'forwardable' + +module Semian + module SysV + class State < Semian::Simple::State #:nodoc: + include SysVSharedMemory + extend Forwardable + + SYM_TO_NUM = {closed: 0, open: 1, half_open: 2}.freeze + NUM_TO_SYM = SYM_TO_NUM.invert.freeze + + def_delegators :@integer, :semid, :shmid, :synchronize, :acquire_memory_object, + :bind_initialize_memory_callback, :destroy + private :acquire_memory_object, :bind_initialize_memory_callback + + def initialize(name:, permissions:) + @integer = Semian::SysV::Integer.new(name: name, permissions: permissions) + end + + def value + NUM_TO_SYM.fetch(@integer.value) { raise ArgumentError } + end + + private + + def value=(sym) + @integer.value = SYM_TO_NUM.fetch(sym) { raise ArgumentError } + end + end + end +end diff --git a/lib/semian/unprotected_resource.rb b/lib/semian/unprotected_resource.rb index a8c18599..3faa1bea 100644 --- a/lib/semian/unprotected_resource.rb +++ b/lib/semian/unprotected_resource.rb @@ -36,5 +36,8 @@ def request_allowed? def mark_failed(_error) end + + def mark_success + end end end diff --git a/test/circuit_breaker_test.rb b/test/circuit_breaker_test.rb index 5fcb904e..441a2e88 100644 --- a/test/circuit_breaker_test.rb +++ b/test/circuit_breaker_test.rb @@ -86,6 +86,64 @@ def test_sparse_errors_dont_open_circuit Semian.destroy(:three) end + def test_shared_error_threshold_between_workers_to_open + # only valid if there's persistence + begin + Semian.destroy(:testing) + rescue + nil + end + Semian.register(:testing, tickets: 1, exceptions: [SomeError], error_threshold: 10, error_timeout: 5, success_threshold: 4) + @resource = Semian[:testing] + 10.times do + fork do + @resource.mark_failed SomeError + end + end + Process.waitall + assert_circuit_opened + end + + def test_shared_success_threshold_between_workers_to_close + test_shared_error_threshold_between_workers_to_open + Timecop.travel(6) + @resource = Semian[:testing] + 5.times do + fork do + @resource.mark_success + end + end + Process.waitall + assert_circuit_closed + end + + def test_shared_fresh_worker_killed_should_not_reset_circuit_breaker_data + # Won't reset if at least one worker is still attached to it. + begin + Semian.destroy(:testing) + rescue + nil + end + + reader, writer = IO.pipe + pid = fork do + reader.close + Semian.register(:unique_res, tickets: 1, exceptions: [SomeError], error_threshold: 2, error_timeout: 5, success_threshold: 1) + resource_inner = Semian[:unique_res] + resource_inner.reset + open_circuit! resource_inner + writer.puts "Done" + writer.close + sleep + end + + reader.gets + Semian.register(:unique_res, tickets: 1, exceptions: [SomeError], error_threshold: 2, error_timeout: 5, success_threshold: 1) + Process.kill(9, pid) + Process.waitall + assert_circuit_opened Semian[:unique_res] + end + private def open_circuit!(resource = @resource) diff --git a/test/resource_test.rb b/test/resource_test.rb index fa48c606..f9fd2cf0 100644 --- a/test/resource_test.rb +++ b/test/resource_test.rb @@ -108,19 +108,29 @@ def test_acquire_timeout_override end def test_acquire_with_fork - resource = create_resource :testing, tickets: 2, timeout: 0.5 - - resource.acquire do - fork do - resource.acquire do - assert_raises Semian::TimeoutError do - resource.acquire {} - end + pids = [] + + 2.times do + reader, writer = IO.pipe + pids << fork do + create_resource(:testing, tickets: 2, timeout: 0.5).acquire do + reader.close + writer.puts "Acquired" + writer.close + sleep end end + reader.gets + end + + assert_raises Semian::TimeoutError do + create_resource(:testing, tickets: 2, timeout: 0.5).acquire {} + end - Process.wait + pids.each do |pid| + Process.kill(9, pid) end + Process.waitall end def test_acquire_releases_on_kill diff --git a/test/simple_integer_test.rb b/test/simple_integer_test.rb new file mode 100644 index 00000000..5e52c194 --- /dev/null +++ b/test/simple_integer_test.rb @@ -0,0 +1,49 @@ +require 'test_helper' + +class TestSimpleInteger < MiniTest::Unit::TestCase + KLASS = ::Semian::Simple::Integer + + def setup + @integer = KLASS.new + end + + def teardown + @integer.destroy + end + + module IntegerTestCases + def test_access_value + assert_equal(0, @integer.value) + @integer.value = 99 + assert_equal(99, @integer.value) + time_now = (Time.now).to_i + @integer.value = time_now + assert_equal(time_now, @integer.value) + @integer.value = 6 + assert_equal(6, @integer.value) + @integer.value = 6 + assert_equal(6, @integer.value) + end + + def test_increment + @integer.increment(4) + assert_equal(4, @integer.value) + @integer.increment + assert_equal(5, @integer.value) + @integer.increment(-2) + assert_equal(3, @integer.value) + end + + def test_reset_on_init + assert_equal(0, @integer.value) + end + + def test_reset + @integer.increment(5) + @integer.reset + assert_equal(0, @integer.value) + end + end + + include IntegerTestCases +end diff --git a/test/simple_sliding_window_test.rb b/test/simple_sliding_window_test.rb new file mode 100644 index 00000000..11487180 --- /dev/null +++ b/test/simple_sliding_window_test.rb @@ -0,0 +1,65 @@ +require 'test_helper' + +class TestSimpleSlidingWindow < MiniTest::Unit::TestCase + KLASS = ::Semian::Simple::SlidingWindow + + def setup + @sliding_window = KLASS.new(max_size: 6) + @sliding_window.clear + end + + def teardown + @sliding_window.destroy + end + + module SlidingWindowTestCases + def test_sliding_window_push + assert_equal(0, @sliding_window.size) + @sliding_window << 1 + assert_sliding_window(@sliding_window, [1], 6) + @sliding_window << 5 + assert_sliding_window(@sliding_window, [1, 5], 6) + end + + def test_sliding_window_resize + assert_equal(0, @sliding_window.size) + @sliding_window << 1 << 2 << 3 << 4 << 5 << 6 + assert_sliding_window(@sliding_window, [1, 2, 3, 4, 5, 6], 6) + @sliding_window.resize_to 6 + assert_sliding_window(@sliding_window, [1, 2, 3, 4, 5, 6], 6) + @sliding_window.resize_to 5 + assert_sliding_window(@sliding_window, [2, 3, 4, 5, 6], 5) + @sliding_window.resize_to 6 + assert_sliding_window(@sliding_window, [2, 3, 4, 5, 6], 6) + end + + def test_sliding_window_edge_falloff + assert_equal(0, @sliding_window.size) + @sliding_window << 0 << 1 << 2 << 3 << 4 << 5 << 6 << 7 + assert_sliding_window(@sliding_window, [2, 3, 4, 5, 6, 7], 6) + @sliding_window.shift + assert_sliding_window(@sliding_window, [3, 4, 5, 6, 7], 6) + end + + def resize_to_less_than_1_raises + assert_raises ArgumentError do + @sliding_window.resize_to 0 + end + end + end + + module SlidingWindowUtilityMethods + def assert_sliding_window(sliding_window, array, max_size) + # Get private member, the sliding_window doesn't expose the entire array + data = sliding_window.instance_variable_get("@window") + assert_equal(array, data) + assert_equal(max_size, sliding_window.max_size) + end + end + + include SlidingWindowTestCases + + private + + include SlidingWindowUtilityMethods +end diff --git a/test/simple_state_test.rb b/test/simple_state_test.rb new file mode 100644 index 00000000..f05d86a2 --- /dev/null +++ b/test/simple_state_test.rb @@ -0,0 +1,45 @@ +require 'test_helper' + +class TestSimpleState < MiniTest::Unit::TestCase + KLASS = ::Semian::Simple::State + + def setup + @state = KLASS.new + end + + def teardown + @state.destroy + end + + module StateTestCases + def test_start_closed? + assert @state.closed? + end + + def test_open + @state.open + assert @state.open? + assert_equal @state.value, :open + end + + def test_close + @state.close + assert @state.closed? + assert_equal @state.value, :closed + end + + def test_half_open + @state.half_open + assert @state.half_open? + assert_equal @state.value, :half_open + end + + def test_reset + @state.reset + assert @state.closed? + assert_equal @state.value, :closed + end + end + + include StateTestCases +end diff --git a/test/sysv_integer_test.rb b/test/sysv_integer_test.rb new file mode 100644 index 00000000..1d140cb4 --- /dev/null +++ b/test/sysv_integer_test.rb @@ -0,0 +1,57 @@ +require 'test_helper' + +class TestSysVInteger < MiniTest::Unit::TestCase + KLASS = ::Semian::SysV::Integer + + def setup + @integer = KLASS.new(name: 'TestSysVInteger', permissions: 0660) + @integer.reset + end + + def teardown + @integer.destroy + end + + include TestSimpleInteger::IntegerTestCases + + def test_memory_is_shared + integer_2 = KLASS.new(name: 'TestSysVInteger', permissions: 0660) + integer_2.value = 100 + assert_equal 100, @integer.value + @integer.value = 200 + assert_equal 200, integer_2.value + @integer.value = 0 + assert_equal 0, integer_2.value + end + + def test_memory_not_reset_when_at_least_one_worker_using_it + @integer.value = 109 + integer_2 = KLASS.new(name: 'TestSysVInteger', permissions: 0660) + assert_equal @integer.value, integer_2.value + + reader, writer = IO.pipe + pid = fork do + reader.close + integer_3 = KLASS.new(name: 'TestSysVInteger', permissions: 0660) + assert_equal 109, integer_3.value + integer_3.value = 110 + writer.puts "Done" + writer.close + sleep + end + + reader.gets + Process.kill(9, pid) + assert_equal 110, integer_2.value + end + + def test_memory_reset_when_no_workers_using_it + fork do + integer = KLASS.new(name: 'TestSysVInteger_2', permissions: 0660) + integer.value = 109 + end + Process.waitall + @integer = KLASS.new(name: 'TestSysVInteger_2', permissions: 0660) + assert_equal 0, @integer.value + end +end diff --git a/test/sysv_sliding_window_test.rb b/test/sysv_sliding_window_test.rb new file mode 100644 index 00000000..aca43679 --- /dev/null +++ b/test/sysv_sliding_window_test.rb @@ -0,0 +1,166 @@ +require 'test_helper' + +class TestSysVSlidingWindow < MiniTest::Unit::TestCase + KLASS = ::Semian::SysV::SlidingWindow + + def setup + @sliding_window = KLASS.new(max_size: 6, + name: 'TestSysVSlidingWindow', + permissions: 0660) + @sliding_window.clear + end + + def teardown + @sliding_window.destroy + end + + include TestSimpleSlidingWindow::SlidingWindowTestCases + + def test_forcefully_killing_worker_holding_on_to_semaphore_releases_it + Timeout.timeout(1) do # assure dont hang + @sliding_window << 100 + assert_equal 100, @sliding_window.first + end + + reader, writer = IO.pipe + pid = fork do + reader.close + sliding_window_2 = KLASS.new(max_size: 6, + name: 'TestSysVSlidingWindow', + permissions: 0660) + sliding_window_2.synchronize do + writer.puts "Done" + writer.close + sleep + end + end + + reader.gets + Process.kill(9, pid) + Process.waitall + + Timeout.timeout(1) do # assure dont hang + @sliding_window << 100 + assert_equal(100, @sliding_window.first) + end + end + + def test_sliding_window_memory_is_actually_shared + assert_equal 0, @sliding_window.size + sliding_window_2 = KLASS.new(max_size: 6, + name: 'TestSysVSlidingWindow', + permissions: 0660) + assert_equal 0, sliding_window_2.size + + large_number = (Time.now.to_f * 1000).to_i + @sliding_window << large_number + assert_sliding_window(@sliding_window, [large_number], 6) + assert_sliding_windows_in_sync(@sliding_window, sliding_window_2) + sliding_window_2 << 6 << 4 << 3 << 2 + assert_sliding_window(@sliding_window, [large_number, 6, 4, 3, 2], 6) + assert_sliding_windows_in_sync(@sliding_window, sliding_window_2) + + @sliding_window.clear + assert_sliding_window(@sliding_window, [], 6) + assert_sliding_windows_in_sync(@sliding_window, sliding_window_2) + end + + def test_restarting_worker_should_not_reset_queue + @sliding_window << 10 << 20 << 30 + sliding_window_2 = KLASS.new(max_size: 6, + name: 'TestSysVSlidingWindow', + permissions: 0660) + assert_sliding_window(sliding_window_2, [10, 20, 30], 6) + sliding_window_2.pop + assert_sliding_window(sliding_window_2, [10, 20], 6) + assert_sliding_windows_in_sync(@sliding_window, sliding_window_2) + + sliding_window_3 = KLASS.new(max_size: 6, + name: 'TestSysVSlidingWindow', + permissions: 0660) + assert_sliding_window(sliding_window_3, [10, 20], 6) + sliding_window_3.pop + assert_sliding_window(@sliding_window, [10], 6) + assert_sliding_windows_in_sync(@sliding_window, sliding_window_2) + end + + def test_other_workers_automatically_switching_to_new_memory_resizing_up_or_down + # Test explicit resizing, and resizing through making new memory associations + + # B resize down through init + sliding_window_2 = KLASS.new(max_size: 4, + name: 'TestSysVSlidingWindow', + permissions: 0660) + sliding_window_2 << 80 << 90 << 100 << 110 << 120 + assert_sliding_window(sliding_window_2, [90, 100, 110, 120], 4) + assert_sliding_windows_in_sync(@sliding_window, sliding_window_2) + + # A explicit resize down, + @sliding_window.resize_to(2) + assert_sliding_windows_in_sync(@sliding_window, sliding_window_2) + assert_sliding_window(@sliding_window, [110, 120], 2) + + # B resize up through init + sliding_window_2 = KLASS.new(max_size: 4, + name: 'TestSysVSlidingWindow', + permissions: 0660) + assert_sliding_windows_in_sync(@sliding_window, sliding_window_2) + assert_sliding_window(@sliding_window, [110, 120], 4) + + # A explicit resize up + @sliding_window.resize_to(6) + @sliding_window << 130 + assert_sliding_windows_in_sync(@sliding_window, sliding_window_2) + assert_sliding_window(@sliding_window, [110, 120, 130], 6) + + # B resize down through init + sliding_window_2 = KLASS.new(max_size: 2, + name: 'TestSysVSlidingWindow', + permissions: 0660) + assert_sliding_windows_in_sync(@sliding_window, sliding_window_2) + assert_sliding_window(@sliding_window, [120, 130], 2) + + # A explicit resize up + @sliding_window.resize_to(4) + assert_sliding_windows_in_sync(@sliding_window, sliding_window_2) + assert_sliding_window(@sliding_window, [120, 130], 4) + + # B resize up through init + sliding_window_2 = KLASS.new(max_size: 6, + name: 'TestSysVSlidingWindow', + permissions: 0660) + assert_sliding_windows_in_sync(@sliding_window, sliding_window_2) + assert_sliding_window(@sliding_window, [120, 130], 6) + + # B resize, but no final size change + sliding_window_2 << 140 << 150 << 160 << 170 + sliding_window_2.resize_to(4) + sliding_window_2 << 180 + sliding_window_2.resize_to(6) + assert_sliding_window(@sliding_window, [150, 160, 170, 180], 6) + assert_sliding_windows_in_sync(@sliding_window, sliding_window_2) + end + + private + + def assert_sliding_window(sliding_window, array, max_size) + assert_correct_first_and_last_and_size(sliding_window, array.first, array.last, array.size, max_size) + end + + def assert_correct_first_and_last_and_size(sliding_window, first, last, size, max_size) + assert_equal(first, sliding_window.first) + assert_equal(last, sliding_window.last) + assert_equal(size, sliding_window.size) + assert_equal(max_size, sliding_window.max_size) + end + + def assert_sliding_windows_in_sync(sliding_window_1, sliding_window_2) + # it only exposes ends, size, and max_size, so can only check those + assert_equal(sliding_window_1.first, sliding_window_2.first) + assert_equal(sliding_window_1.last, sliding_window_2.last) + assert_equal(sliding_window_1.size, sliding_window_2.size) + assert_equal(sliding_window_1.max_size, sliding_window_2.max_size) + end + + include TestSimpleSlidingWindow::SlidingWindowUtilityMethods +end diff --git a/test/sysv_state_test.rb b/test/sysv_state_test.rb new file mode 100644 index 00000000..11219626 --- /dev/null +++ b/test/sysv_state_test.rb @@ -0,0 +1,45 @@ +require 'test_helper' + +class TestSysVState < MiniTest::Unit::TestCase + KLASS = ::Semian::SysV::State + + def setup + @state = KLASS.new(name: 'TestSysVState', + permissions: 0660) + @state.reset + end + + def teardown + @state.destroy + end + + include TestSimpleState::StateTestCases + + def test_memory_is_shared + assert_equal :closed, @state.value + @state.open + + state_2 = KLASS.new(name: 'TestSysVState', + permissions: 0660) + assert_equal :open, state_2.value + assert state_2.open? + end + + def test_will_throw_error_when_invalid_symbol_given + # May occur if underlying integer gets into bad state + integer = @state.instance_eval "@integer" + integer.value = 100 + assert_raises ArgumentError do + @state.value + end + assert_raises ArgumentError do + @state.open? + end + assert_raises ArgumentError do + @state.half_open? + end + assert_raises ArgumentError do + @state.closed? + end + end +end diff --git a/test/test_helper.rb b/test/test_helper.rb index 6209a0ec..3c34f825 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -6,6 +6,7 @@ require 'timecop' require 'tempfile' require 'fileutils' +require 'timeout' require 'helpers/background_helper'