Updated Protocol

Improved the response protocol with a status field and
byte length of response payload.
This commit is contained in:
Joseph Rothrock 2012-12-15 16:24:47 -08:00
parent 99de1ba924
commit 459134f146
5 changed files with 219 additions and 187 deletions

120
README
View File

@ -45,25 +45,37 @@ The database supports the notion of a composite key. That is, a key
that is subdivided into a hierarchy that all keys participate in. The key-
space is similar to a typical Unix filesystem organized into directories.
To work with hierarchical keys, divide them with slashes '/'. The last
element becomes the value. Like so:
To work with hierarchical keys, divide them with spaces ' '. The last
element becomes the value. The 'value' does not participate in the
key-space.
/a/b/c/my_value
The commands
The keys are grouped into a tree of subdirectories:
/
a/
b/
c/
create finance accounting payroll employees 50
create finance accounting receivables employees 70
Creates a 4-level hierarchy:
1 2 3 4
finance
accounting
payroll
employees
receivables
employees
The key-space then becomes a kind of database on its own. A client
can query the database for all the subkeys of a path. This gives clients
building blocks for range queries and ordered (sorted) results.
Nodes in the key-space are reference-counted so that repeating groups
in the hierarchy don't waste space.
All lookups of values are still done via hashmap of the entire key. In
other words, values can only be fetched by providing the entire key.
In this case, the hash-lookup key is '/a/b/c' which points to a starting
block in the db that contains 'my_value'.
other words, values can only be fetched by providing THE ENTIRE KEY.
In this case, the hash-lookup key is 'finance accounting payroll employees'.
It points to a starting block in the db that contains '50'.
This means that point-lookups of records will always be very fast, but that
the operator can use the key-space to organize and sort results.
@ -75,65 +87,69 @@ Usage Example
--------------
madison:Roxanne rothrock$ sudo dbr_ctl start
Started listening.
madison:Roxanne rothrock$ telnet localhost 4080
Trying ::1...
carp:Roxanne rothrock$ telnet localhost 4080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
create /alpha/The first record
create alpha record_1
STATUS: OK
SIZE: 9
Write OK.
read /alpha
The first record
create /alpha/beta/record
create alpha beta record_2
STATUS: OK
SIZE: 9
Write OK.
read /alpha/beta
record
delete /alpha
Delete OK.
read /alpha
Not found.
read /alpha/beta
record
read alpha
STATUS: OK
SIZE: 8
record_1
read alpha beta
STATUS: OK
SIZE: 8
record_2
...Now a demonstration of the 'keys' command:
create /alpha/phi/record
create alpha gamma record_3
STATUS: OK
SIZE: 9
Write OK.
create /alpha/1/record
create alpha delta record_4
STATUS: OK
SIZE: 9
Write OK.
create /alpha/zeta/record
Write OK.
keys /alpha
1
beta
phi
zeta
keys alpha
STATUS: OK
SIZE: 16
beta delta gamma
--------------
Protocol
--------------
Data manipulation commands are comprised of:
a keyword,
a space, and
a forward-slash.
Data manipulation commands:
create /key[/key].../value<newline>
create key [key]...value<newline>
read /key[/key]...
read key [key]...<newline>
delete /key[/key]...
delete key [key]...<newline>
keys /[key][/key]....
keys [key] [key]....<newline>
Other commands:
quit
Response:
"STATUS: <status string>\nSIZE: <integer>\n<integer> long string of bytes\n\n"
--------------
On Disk
@ -194,13 +210,13 @@ the append occurred.
#### The keydb (/var/roxanne/keydb)
Initially sized at 0 bytes, the keydb stores the composite key hierarchy.
Each level of the hierarchy is a simple, binary tree of the
Each level of the hierarchy is stored as a binary tree.
Each node in the hierarchy has: a key-part, a left pointer (less-than),
a right pointer (greater-than), and a next pointer that points to the
next level in the hierarchy.
Nodes have a reference count, so duplicate key-parts don't require
Nodes have a reference count so that duplicate key-parts don't require
additional space. Unfortunately, the database does not yet support
reclamation of keydb nodes with a reference count of 0.
@ -208,8 +224,8 @@ reclamation of keydb nodes with a reference count of 0.
Consider the following two composite keys (values left off):
/foo/bar/toast
/foo/bar/jam
foo bar toast
foo bar jam
This set of two composite keys comprises 4 nodes in the keydb, stored
like so:
@ -223,10 +239,10 @@ jam 1 NULL NULL NULL
Next, add these composite keys (again, values left off).
/foo/bar/whiskey
/zen
/zoo
/egg
foo bar whiskey
zen
zoo
egg
The table now looks like this:

View File

@ -21,6 +21,7 @@ THE SOFTWARE.
*/
#include "roxanne_db.h"
#include "status_codes.h"
char DATA_HOME[4096] = "/var/roxanne";
@ -273,7 +274,7 @@ int start_listening(char* host, char* port, int backlog) {
int tokenize_command(char* msg, char* token_vector[]) {
char **token_ptr;
int i = 0;
for (token_ptr = token_vector; (*token_ptr = strsep(&msg, " \t")) != NULL;)
for (token_ptr = token_vector; (*token_ptr = strsep(&msg, " ")) != NULL;)
if (**token_ptr != '\0') {
i++;
if (++token_ptr >= &token_vector[MAX_ARGS])
@ -655,16 +656,17 @@ int write_record(char* key, char* value) {
int guts(int accept_fd, int listen_fd) {
char buffer[RECV_WINDOW] = ""; // recv buffer
char buffer[RECV_WINDOW] = ""; // recv buffer
int msgbuflen = MSG_SIZE;
char *msg; // Holds our incoming and outgoing messages.
char * msg_copy;
char status_msg[MSG_SIZE];
char *msg; // Incoming message.
char *send_msg; // Outgoing message.
char *tmp_msg;
void *msg_cursor;
char *response;
int msglen = 0; // length of the assembled message that we receive.
int recvlen = 0; // how many bytes recv call returns.
int responselen = 0;
struct response_struct response;
int msglen = 0; // length of the assembled message that we receive.
int recvlen = 0; // how many bytes recv call returns.
int responselen = 0;
int offset;
int retval;
char* token_vector[MAX_ARGS] = {'\0'};
@ -724,8 +726,8 @@ int guts(int accept_fd, int listen_fd) {
}
msg_copy = msg;
strsep(&msg_copy, "\r\n");
tmp_msg = msg;
strsep(&tmp_msg, "\r\n");
token_count = tokenize_command(msg, token_vector);
@ -752,22 +754,46 @@ int guts(int accept_fd, int listen_fd) {
break;
default:
response = malloc(sizeof(char) * MSG_SIZE);
bzero(response, MSG_SIZE);
sprintf(response, "Unknown command.\n");
if ((response.msg = malloc(sizeof(char) * MSG_SIZE)) == NULL) {
perror(NULL);
cleanup_and_exit;
}
bzero(response.msg, MSG_SIZE);
sprintf(response.msg, "Unknown command.");
response.status = 1;
}
responselen = strlen(response);
if((send(accept_fd, (void*)response, responselen, 0) == -1)) perror("Send failed");
responselen = prepare_send_msg(response, &send_msg);
if((send(accept_fd, (void*)send_msg, responselen, 0) == -1)) perror("Send failed");
free(msg);
free(response);
free(response.msg);
free(send_msg);
};
return(0);
}
int prepare_send_msg(struct response_struct response, char** send_msg) {
char status_msg[MSG_SIZE] = { '\0' };
int responselen;
sprintf(status_msg, "STATUS: %s\nSIZE: %d\n",
STATUS_CODES[response.status],
(int)strlen(response.msg));
responselen = strlen(response.msg) + strlen(status_msg) + 2;
if ((*send_msg = malloc(responselen)) == NULL) {
perror(NULL);
cleanup_and_exit();
}
*send_msg[0] = '\0';
strcat(*send_msg, status_msg);
strcat(*send_msg, response.msg);
strcat(*send_msg, "\n\n");
return responselen;
}
int bit_array_set(char bit_array[], int bit) {
int byte_offset = floor(bit/8);
@ -829,7 +855,7 @@ void hash_write_unlock(int hash_number) {
}
char* create_command(char* token_vector[], int token_count) {
struct response_struct create_command(char* token_vector[], int token_count) {
int length = 0;
int i = 0;
@ -840,13 +866,19 @@ char* create_command(char* token_vector[], int token_count) {
struct keydb_column *tuple = NULL;
struct keydb_column *head = NULL;
struct keydb_column *tmp;
char* response;
struct response_struct response;
response = malloc(sizeof(char) * MSG_SIZE);
bzero(response, MSG_SIZE);
response.status = 0;
if ((response.msg = malloc(sizeof(char) * MSG_SIZE)) == NULL) {
perror(NULL);
cleanup_and_exit();
}
bzero(response.msg, MSG_SIZE);
if (token_count < 3) {
sprintf(response, "Not enough arguments.\n");
response.status = 1;
sprintf(response.msg, "Not enough arguments.");
return response;
}
@ -855,15 +887,15 @@ char* create_command(char* token_vector[], int token_count) {
if (previous_part != NULL) {
length += strlen(previous_part);
if (length > KEY_LEN - 1) {
sprintf(response, "Key too large.\n");
response.status = 1;
sprintf(response.msg, "Key too large.");
return response;
}
// Save away the list of key composites
if ((tmp = malloc(sizeof(struct keydb_column))) == NULL) {
sprintf(response, "Internal error.");
perror("Call to malloc() failed in create_command for tuple->next.\n");
return response;
perror(NULL);
cleanup_and_exit;
}
strncpy(tmp->column, previous_part, KEY_LEN);
tmp->next = NULL;
@ -882,7 +914,8 @@ char* create_command(char* token_vector[], int token_count) {
}
if (key[0] == '\0') {
sprintf(response, "Failed to get value.\n");
response.status = 1;
sprintf(response.msg, "Failed to get value.");
return response;
}
@ -890,15 +923,18 @@ char* create_command(char* token_vector[], int token_count) {
if (retval == 0) {
if (composite_insert(KEYDB_FD, head) == -1) {
delete_record(key); // undo what we did.
fprintf(stderr, "Composite key insertion failed.\n");
sprintf(response, "Internal error.");
fprintf(stderr, "Composite key insertion failed.");
response.status = 1;
sprintf(response.msg, "Internal error.");
} else {
sprintf(response, "Write OK.\n");
sprintf(response.msg, "Write OK.");
}
} else if (retval == -2) { // key already exists.
sprintf(response, "Write failed. Key exists in the index.\n");
response.status = 1;
sprintf(response.msg, "Write failed. Key exists in the index.");
} else {
sprintf(response, "Internal error.");
response.status = 1;
sprintf(response.msg, "Internal error.");
}
while (head) { // free our list of key composites.
@ -909,7 +945,7 @@ char* create_command(char* token_vector[], int token_count) {
return response;
}
char* read_command(char* token_vector[], int token_count) {
struct response_struct read_command(char* token_vector[], int token_count) {
char* part = NULL;
char* value;
@ -919,20 +955,24 @@ char* read_command(char* token_vector[], int token_count) {
struct db_ptr db_rec;
int responselen = 0;
int i;
char* response;
struct response_struct response;
response = malloc(sizeof(char) * MSG_SIZE);
bzero(response, MSG_SIZE);
response.status = 0;
response.msg = malloc(sizeof(char) * MSG_SIZE);
bzero(response.msg, MSG_SIZE);
if (token_count == 1) {
sprintf(response, "No keys supplied.\n");
sprintf(response.msg, "No keys supplied.");
response.status = 1;
return response;
}
for (i = 1; token_vector[i] && i < MAX_ARGS; i++) {
strcat(key, token_vector[i]);
if (strlen(key) > KEY_LEN - 1) {
sprintf(response, "Key too long.\n");
sprintf(response.msg, "Key too long.");
response.status = 1;
return response;
}
}
@ -942,18 +982,19 @@ char* read_command(char* token_vector[], int token_count) {
value = read_record(db_rec);
responselen = strlen(value);
if (responselen >= MSG_SIZE) { // need to expand response.
free(response);
response = malloc(sizeof(char) * (responselen + 2));
free(response.msg);
response.msg = malloc(sizeof(char) * (responselen + 2));
}
sprintf(response, "%s\n", value);
sprintf(response.msg, "%s", value);
free(value);
} else {
sprintf(response, "Not found.\n");
sprintf(response.msg, "Not found.");
response.status = 1;
}
return response;
}
char* delete_command(char* token_vector[], int token_count){
struct response_struct delete_command(char* token_vector[], int token_count){
char* part = NULL;
char key[KEY_LEN] = "";
@ -964,28 +1005,33 @@ char* delete_command(char* token_vector[], int token_count){
struct keydb_column *head = NULL;
struct keydb_column *tmp;
int responselen = 0;
char* response;
struct response_struct response;
response.status = 0;
if ((response.msg = malloc(sizeof(char) * MSG_SIZE)) == NULL) {
perror(NULL);
cleanup_and_exit();
}
response = malloc(sizeof(char) * MSG_SIZE);
bzero(response, MSG_SIZE);
if (token_count < 2) {
sprintf(response, "Not enough arguments.\n");
response.status = 1;
sprintf(response.msg, "Not enough arguments");
return response;
}
for (i = 1; (part = token_vector[i]) && (i < MAX_ARGS); i++) {
length += strlen(part);
if (length > KEY_LEN - 1) {
sprintf(response, "Key too large.\n");
response.status = 1;
sprintf(response.msg, "Key too large");
return response;
}
// Save away the list of key composites
if ((tmp = malloc(sizeof(struct keydb_column))) == NULL) {
sprintf(response, "Delete failed.\n");
perror("Call to malloc() failed in delete_command.\n");
return response;
perror(NULL);
cleanup_and_exit();
}
strncpy(tmp->column, part, KEY_LEN);
tmp->next = NULL;
@ -1001,7 +1047,8 @@ char* delete_command(char* token_vector[], int token_count){
}
if (length == 0) {
sprintf(response, "Failed to extract key.\n");
response.status = 1;
sprintf(response.msg, "Failed to extract key.");
return response;
}
@ -1009,15 +1056,18 @@ char* delete_command(char* token_vector[], int token_count){
if (retval == 0) {
if (composite_delete(KEYDB_FD, head) == -1) {
fprintf(stderr, "Composite key delete failed.\n");
sprintf(response, "Composite key delete failed.\n");
fprintf(stderr, "Composite key delete failed.");
response.status = 1;
sprintf(response.msg, "Internal Error.");
} else {
sprintf(response, "Delete OK.\n");
sprintf(response.msg, "Delete OK.");
}
} else if (retval == -2) {
sprintf(response, "Not found.\n");
response.status = 1;
sprintf(response.msg, "Not found.");
} else {
sprintf(response, "Could not delete record.\n");
response.status = 1;
sprintf(response.msg, "Could not delete record.");
}
while (head) { // free our list of key composites.
@ -1028,7 +1078,7 @@ char* delete_command(char* token_vector[], int token_count){
return response;
}
char* keys_command(char* token_vector[], int token_count) {
struct response_struct keys_command(char* token_vector[], int token_count) {
char* part = NULL;
char key[KEY_LEN] = "";
@ -1040,38 +1090,46 @@ char* keys_command(char* token_vector[], int token_count) {
struct keydb_column *list, *tmp, *cursor;
bool some_content = false; // Does our key list have any keys in it?
list = NULL;
char* response;
char* tmp_response;
int response_free_bytes = MSG_SIZE;
int responselen = 0;
int column_size;
struct response_struct response;
response = malloc(sizeof(char) * MSG_SIZE);
bzero(response, MSG_SIZE);
response.status = 0;
if ((response.msg = malloc(sizeof(char) * MSG_SIZE)) == NULL) {
perror(NULL);
cleanup_and_exit();
}
bzero(response.msg, MSG_SIZE);
for (i = 1; (part = token_vector[i]) && (i < MAX_ARGS); i++) {
length += strlen(part);
if (length > KEY_LEN - 1) {
sprintf(response, "Key too large.\n");
response.status = 1;
sprintf(response.msg, "Key too large.");
return response;
}
node = keydb_find(KEYDB_FD, part, pos);
if (!(node = keydb_find(KEYDB_FD, part, pos))) {
sprintf(response, "Not found.\n");
response.status = 1;
sprintf(response.msg, "Not found.");
return response;
}
if (node->refcount <= 0) {
sprintf(response, "Not found.\n");
response.status = 1;
sprintf(response.msg, "Not found.");
return response;
}
pos = node->next;
free(node);
if (pos == 0) { // There is no next subtree.
sprintf(response, "No subkeys.\n");
response.status = 1;
sprintf(response.msg, "No subkeys.");
return response;
}
@ -1083,16 +1141,16 @@ char* keys_command(char* token_vector[], int token_count) {
column_size = strlen(list->column) + 2;
response_free_bytes -= column_size;
if (response_free_bytes < 1) { // need to expand response.
responselen = strlen(response);
responselen = strlen(response.msg);
responselen += column_size;
tmp_response = malloc(sizeof(char) * responselen);
strcpy(tmp_response, response);
free(response);
response = tmp_response;
strcpy(tmp_response, response.msg);
free(response.msg);
response.msg = tmp_response;
response_free_bytes = 0;
}
strcat(response, list->column);
strcat(response, "\n");
strcat(response.msg, list->column);
strcat(response.msg, " ");
some_content = true;
}
tmp = list->next;
@ -1100,70 +1158,19 @@ char* keys_command(char* token_vector[], int token_count) {
list = tmp;
}
if (!some_content) sprintf(response, "No subkeys.\n");
if (!some_content) {
sprintf(response.msg, "No subkeys.");
response.status = 1;
}
response.msg[strlen(response.msg) - 1] = '\0'; // Knock out that last extra space.
return response;
}
void count_command(char msg[], char response[]) {
char* part = NULL;
char key[KEY_LEN] = "";
int length = 0;
int retval;
int refcount = 0;
int64_t pos = 0;
struct keydb_node *node;
struct keydb_column *list, *tmp, *cursor;
list = NULL;
part = strtok(msg, "/");
for (part = strtok(NULL, "\r\n/"); part; part = strtok(NULL, "\r\n/")) {
length += strlen(part);
if (length > KEY_LEN - 1) {
sprintf(response, "Key too long.\n");
part = NULL;
return;
}
if ((node = keydb_find(KEYDB_FD, part, pos))) {
pos = node->next;
refcount = node->refcount;
free(node);
if (pos == 0) { // There is no next subtree.
sprintf(response, "%d\n", refcount);
return;
}
} else {
sprintf(response, "Not found.\n");
return;
}
}
// Sum all the refcounts of all keys in the root tree.
// This is for 'count /'.
keydb_tree(KEYDB_FD, pos, &list);
while (list) {
if (list->column[0] != '\0') {
refcount += list->refcount;
}
tmp = list->next;
free(list);
list = tmp;
}
sprintf(response, "%d\n", refcount);
return;
}
void usage(char *argv) {
fprintf(stderr, "usage: %s [-h listen_addr] [-p listen_port] [-d /path/to/db/directory]\n", argv);
exit(-1);
}
int keydb_txlog_reset() {
// Creates a transaction log file if one doesn't exist.
// Otherwise it truncates the transaction log file.

View File

@ -75,6 +75,11 @@ THE SOFTWARE.
#define KEYDB_LOCKS 1024
#define MAX_ARGS 100
struct response_struct {
unsigned int status;
char* msg;
};
struct idx { // structure for an index record.
char key[KEY_LEN];
@ -123,7 +128,6 @@ int IDX_FD;
// Function signatures
int start_listening(char* host, char* port, int backlog);
void sigchld_handler(int s);
@ -146,10 +150,10 @@ void hash_write_lock(int hash_number);
void hash_write_unlock(int hash_number);
void cleanup_and_exit();
void usage(char *argv);
char* create_command(char* token_vector[], int token_count);
char* read_command(char* token_vector[], int token_count);
char* delete_command(char* token_vector[], int token_count);
char* keys_command(char* token_vector[], int token_count);
struct response_struct create_command(char* token_vector[], int token_count);
struct response_struct read_command(char* token_vector[], int token_count);
struct response_struct delete_command(char* token_vector[], int token_count);
struct response_struct keys_command(char* token_vector[], int token_count);
int keydb_insert(int fd, char column[], int64_t pos, bool go_next);
void keydb_lock(int64_t pos);
void keydb_unlock(int64_t pos);
@ -159,3 +163,4 @@ void* keydb_tree(int fd, int64_t pos, struct keydb_column **list);
int find_free_key_node(int keydb_fd);
int connect_and_add_node(int direction, struct keydb_node* buffer, char column[], int pos, int fd);
int new_subkey_tree(int fd, char column[], int64_t pos, struct keydb_node *buffer);
int prepare_send_msg(struct response_struct response, char** send_msg);

4
status_codes.h Normal file
View File

@ -0,0 +1,4 @@
char* STATUS_CODES[] = {
"OK",
"FAIL"
};

View File

@ -83,8 +83,8 @@ void* keydb_tree(int fd, int64_t pos, struct keydb_column **list) {
if (buffer == NULL) return NULL;
if ((mid = malloc(sizeof(struct keydb_column))) == NULL) {
perror("Call to malloc() failed in keydb_tree.\n");
return NULL;
perror(NULL);
cleanup_and_exit;
}
mid->next = NULL;