Added configuration property socket.keepalive.enable (enable SO_KEEPALIVE)

This commit is contained in:
Magnus Edenhill 2014-05-22 14:55:58 +02:00
parent 4cd83032e8
commit ced7e54ce6
4 changed files with 23 additions and 0 deletions

View File

@ -14,6 +14,7 @@ debug | | A comma-separated lis
socket.timeout.ms | 60000 | Timeout for network requests.
socket.send.buffer.bytes | 0 | Broker socket send buffer size. System default is used if 0.
socket.receive.buffer.bytes | 0 | Broker socket receive buffer size. System default is used if 0.
socket.keepalive.enable | false | Enable TCP keep-alives (SO_KEEPALIVE) on broker sockets
broker.address.ttl | 300000 | How long to cache the broker address resolving results.
broker.address.family | any | Allowed broker IP address families: any, v4, v6
statistics.interval.ms | 0 | librdkafka statistics emit interval. The application also needs to register a stats callback using `rd_kafka_conf_set_stats_cb()`. The granularity is 1000ms. A value of 0 disables statistics.

View File

@ -1498,6 +1498,7 @@ int rd_kafka_socket_cb_generic (int domain, int type, int protocol,
static int rd_kafka_broker_connect (rd_kafka_broker_t *rkb) {
rd_sockaddr_inx_t *sinx;
int one __attribute__((unused)) = 1;
int on = 1;
rd_rkb_dbg(rkb, BROKER, "CONNECT",
"broker in state %s connecting",
@ -1529,6 +1530,22 @@ static int rd_kafka_broker_connect (rd_kafka_broker_t *rkb) {
strerror(errno));
#endif
/* Enable TCP keep-alives, if configured. */
if (rkb->rkb_rk->rk_conf.socket_keepalive) {
#ifdef SO_KEEPALIVE
if (setsockopt(rkb->rkb_s, SOL_SOCKET, SO_KEEPALIVE,
&on, sizeof(on)) == -1)
rd_rkb_dbg(rkb, BROKER, "SOCKET",
"Failed to set SO_KEEPALIVE: %s",
strerror(errno));
#else
rd_rkb_dbg(rkb, BROKER, "SOCKET",
"System does not support "
"socket.keepalive.enable (SO_KEEPALIVE)");
#endif
}
/* Connect to broker */
if (connect(rkb->rkb_s, (struct sockaddr *)sinx,
RD_SOCKADDR_INX_LEN(sinx)) == -1) {
rd_rkb_dbg(rkb, BROKER, "CONNECT",

View File

@ -134,6 +134,10 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
_RK(socket_rcvbuf_size),
"Broker socket receive buffer size. System default is used if 0.",
0, 100000000, 0 },
{ _RK_GLOBAL, "socket.keepalive.enable", _RK_C_BOOL,
_RK(socket_keepalive),
"Enable TCP keep-alives (SO_KEEPALIVE) on broker sockets",
0, 1, 0 },
{ _RK_GLOBAL, "broker.address.ttl", _RK_C_INT,
_RK(broker_addr_ttl),
"How long to cache the broker address resolving results.",

View File

@ -130,6 +130,7 @@ struct rd_kafka_conf_s {
int socket_timeout_ms;
int socket_sndbuf_size;
int socket_rcvbuf_size;
int socket_keepalive;
char *clientid;
char *brokerlist;
int stats_interval_ms;