diff --git a/ext/hermann/hermann_lib.c b/ext/hermann/hermann_lib.c index e84b4a5..b2ea55c 100644 --- a/ext/hermann/hermann_lib.c +++ b/ext/hermann/hermann_lib.c @@ -80,6 +80,38 @@ void fprintf_hermann_instance_config(HermannInstanceConfig *config, topic, brokers, partition, isInitialized, isRkSet, isRktSet ); } +static void hermann_signal_handler(int signum) { + /* Set our global run state to false */ + KLAXON = 1; // We're going down + + /* Invoke Ruby's handler */ +#ifdef TRACE + fprintf(stderr, "hermann_signal_handler invoked with signal %d\n", signum); +#endif + ruby_vm_sighandler(signum); +} + +/** + * During processing loops, unless we have access to rb_blocking_thread, we need to include + * our own signal handler for interrupts. This will allow us to detect sigint and stop the run + * loops, primarily for consumers. + */ +static void hook_into_sighandler_chain() { + struct sigaction our_signal_handler_def; + struct sigaction old_signal_handler_def; + + /* retrieve and store Ruby's signal handler */ + sigaction(SIGINT, NULL, &old_signal_handler_def); + ruby_vm_sighandler = old_signal_handler_def.sa_handler; + + /* set our handler */ + memset(&our_signal_handler_def, 0, sizeof(our_signal_handler_def)); + our_signal_handler_def.sa_handler = hermann_signal_handler; + sigemptyset(&our_signal_handler_def.sa_mask); + sigaction(SIGINT, &our_signal_handler_def, NULL); + +} + /** * Message delivery report callback. * Called once for each message. @@ -364,7 +396,7 @@ void consumer_consume_loop(HermannInstanceConfig* consumerConfig) { fprintf(stderr, "consumer_consume_loop"); #endif - while (consumerConfig->run) { + while (consumerConfig->run && !KLAXON) { if (rd_kafka_consume_callback(consumerConfig->rkt, consumerConfig->partition, 1000/*timeout*/, msg_consume, @@ -893,6 +925,9 @@ void Init_hermann_lib() { fprintf(stderr, "init_hermann_lib\n"); #endif + /* Chain our signal handler with Ruby VM's */ + hook_into_sighandler_chain(); + /* Define the module */ hermann_module = rb_define_module("Hermann"); lib_module = rb_define_module_under(hermann_module, "Lib"); diff --git a/ext/hermann/hermann_lib.h b/ext/hermann/hermann_lib.h index d7fbdfa..6f08f01 100644 --- a/ext/hermann/hermann_lib.h +++ b/ext/hermann/hermann_lib.h @@ -50,6 +50,14 @@ static VALUE hermann_module; static int DEBUG = 0; +// Hold the system signal handler +static void (*ruby_vm_sighandler)(int) = NULL; + +// Global klaxon announcing we're going down +// 0 - normal operation +// 1 - we're going down +static int KLAXON = 0; + // Should we expect rb_thread_blocking_region to be present? // #define RB_THREAD_BLOCKING_REGION #undef RB_THREAD_BLOCKING_REGION