Savepoint. keydb read code implemented but not tested

This commit is contained in:
Joseph Rothrock 2012-01-08 13:57:24 -08:00
parent 2a82ad22e7
commit 8f0309af0a
6 changed files with 383 additions and 28 deletions

View File

@ -1,17 +1,21 @@
CC := gcc
OS := $(shell uname)
CFLAGS := -Werror -g
OBJ := roxanne_db.o tuple_bits.o hash_32.o
DEPS := roxanne_db.h
ifeq (${OS},Linux)
libs = -lrt -lm
LIBS := -lrt -lm
endif
default: dbr
dbr: roxanne_db.c
gcc -Werror -g hash_32.c roxanne_db.c -o dbr $(libs)
dbr: $(OBJ) $(DEPS)
gcc -o dbr $(OBJ) $(CFLAGS) $(LIBS)
chmod 755 dbr
.PHONY: clean
clean:
rm -rf dbr.dSYM dbr
rm -rf dbr.dSYM dbr *.o
install:
install dbr /usr/local/bin

26
README
View File

@ -28,6 +28,9 @@ Hash collisions are resolved by separate chaining onto linked lists at the
end of the index file. The default location for the index file is in
/var/roxanne/idx
In addition to value lookups by key, the database provides a way to group
keys in a hierarchical directory structure. See Composite Keys below
The values for the given keys are stored in contiguous 4KB blocks in the
database file (/var/roxanne/db). A file called block_bitmap tracks the
free/busy blocks in the db file. The dbr processes memory-map this file
@ -36,7 +39,28 @@ Blocks are only added to the db file as needed to accomodate new records.
As typically built, the database can accomodate about a billion blocks.
--------------
Example
Composite Keys
--------------
The database supports the notion of a composite key. That is, a key
that is subdivided into a hierarchy that all keys participate in.
To work with hierarchical keys, divide them with slashes '/'. The last
element becomes the value. Like so:
/a/b/c/my_value
The key-space then becomes a kind of database on its own. A client
can query the database for all the subkeys of a path. This gives clients
building blocks for range queries and ordered (sorted) results.
All lookups of values are still done via hashmap of the entire key. In
other words, values can only be fetched by providing the entire key.
This means that point-lookups of records will always be very fast.
XXX The ability to read subkeys is still in development.
--------------
Usage Example
--------------
madison:Roxanne rothrock$ sudo dbr_ctl start
Started listening.

View File

@ -8,7 +8,7 @@ RUN_AS_USER='nobody'
RUN_AS_GRP='nobody'
DB_PATH='/var/roxanne'
FILES="block_bitmap idx db"
FILES="block_bitmap idx db keydb"
function usage {
echo "Usage: $0 {start|stop|kill|initdb [force]}"
@ -62,6 +62,8 @@ case "$1" in
sudo -u $RUN_AS_USER dd if=/dev/zero of=$DB_PATH/idx bs=1024 count=65536
sudo -u $RUN_AS_USER cat /dev/null >$DB_PATH/db
chown $RUN_AS_USER:$RUN_AS_GRP $DB_PATH/db
sudo -u $RUN_AS_USER cat /dev/null >$DB_PATH/keydb
chown $RUN_AS_USER:$RUN_AS_GRP $DB_PATH/keydb
;;

View File

@ -30,9 +30,12 @@ sem_t* HASH_READ_LOCK;
char *SHM_BLOCK_BITMAP;
char *SHM_HASHBUCKET_BITMAP;
int BLOCK_BITMAP_FD;
int KEYDB_FD;
int DB_FD;
int IDX_FD;
int main(int argc, char* argv[]) {
struct sockaddr incoming;
@ -40,12 +43,11 @@ int main(int argc, char* argv[]) {
int listen_fd, accept_fd;
char* port = "4080";
char* host = "::1";
char keydb_file[4096] = "/var/roxanne/keydb";
char db_file[4096] = "/var/roxanne/db";
char idx_file[4096] = "/var/roxanne/idx";
char block_bitmap_file[4096] = "/var/roxanne/block_bitmap";
int chld;
//int shm_block_offset_id;
//key_t shm_block_offset_key = 1;
int i;
int ch;
@ -55,6 +57,7 @@ int main(int argc, char* argv[]) {
switch (ch) {
case 'd':
sprintf(keydb_file, "%s/keydb", optarg);
sprintf(db_file, "%s/db", optarg);
sprintf(idx_file, "%s/idx", optarg);
sprintf(block_bitmap_file, "%s/block_bitmap", optarg);
@ -141,6 +144,13 @@ int main(int argc, char* argv[]) {
exit(-1);
}
// Open our keydb file
if ((KEYDB_FD = open(keydb_file, O_RDWR | O_CREAT, 0666)) == -1) {
fprintf(stderr, "Couldn't open key file named %s\n", keydb_file);
perror(NULL);
exit(-1);
}
// Open our index file
if ((IDX_FD = open(idx_file, O_RDWR | O_CREAT, 0666)) == -1) {
fprintf(stderr, "Couldn't open index file named %s\n", idx_file);
@ -327,7 +337,7 @@ struct db_ptr find_db_ptr(char* key) {
struct idx index_rec = {};
struct db_ptr db_rec = {.block_offset = -1, .blocks = -1};
int result;
int64_t pos = hash_id * IDX_ENTRY_SIZE;
int64_t pos = hash_id * IDX_ENTRY_SIZE;
while (1) {
@ -638,15 +648,7 @@ int guts(int accept_fd, int listen_fd) {
int msglen = 0; // length of the assembled message that we receive.
int recvlen = 0; // how many bytes recv call returns.
int responselen = 0;
//int length = 0;
//char key[KEY_LEN];
//char* value;
//char* part;
//char* previous_part;
//char* cmd_offset;
//struct db_ptr db_rec;
int retval;
// Re-register the sigterm handler to our cleanup function.
signal(SIGTERM, sigterm_handler_child);
@ -693,11 +695,6 @@ int guts(int accept_fd, int listen_fd) {
}
//length = 0;
//part = NULL;
//previous_part = NULL;
//key[0] = '\0';
switch (extract_command(msg, msglen)) {
case 0: // create
@ -793,19 +790,42 @@ void create_command(char msg[], char response[]) {
char* previous_part = NULL;
int retval = 0;
char key[KEY_LEN] = "";
struct keydb_column *tuple = NULL;
struct keydb_column *first = NULL;
struct keydb_column *tmp;
if ((part = strtok(msg, "/")) == NULL) {
sprintf(response, "Missing key.\n");
return;
}
for (part = strtok(NULL, "\r\n/"); part; part = strtok(NULL, "\r\n/")) {
if (previous_part != NULL) {
length += strlen(previous_part);
if (length > KEY_LEN - 1) {
sprintf(response, "Key too large.\n");
return;
}
// Save away the list of key composites
if ((tmp = malloc(sizeof(struct keydb_column))) == NULL) {
sprintf(response, "Call to malloc() failed in create_command.\n");
perror("Call to malloc() failed in create_command for tuple->next.\n");
return;
}
strncpy(tmp->column, previous_part, KEY_LEN);
tmp->next = NULL;
if (tuple == NULL) {
tuple = tmp;
first = tmp;
} else {
tuple->next = tmp;
tuple = tuple->next;
tuple->next = NULL;
}
strcat(key, previous_part);
}
previous_part = part;
}
@ -817,13 +837,25 @@ void create_command(char msg[], char response[]) {
retval = write_record(key, previous_part);
if (retval == 0) {
sprintf(response, "Write OK.\n");
if (composite_insert(KEYDB_FD, first) == -1) {
delete_record(key); // undo what we did.
fprintf(stderr, "Composite key insertion failed.\n");
sprintf(response, "Write failed. Composite key insertion failed.\n");
} else {
sprintf(response, "Write OK.\n");
}
} else if (retval == -2) { // key already exists.
sprintf(response, "Write failed. Key exists in the index.\n");
} else {
sprintf(response, "write_record() failed. Don't know why.\n");
}
while (first) { // free our list of key composites.
tmp = first->next;
free(first);
first = tmp;
};
}
void read_command(char msg[], char response[]) {
@ -909,3 +941,4 @@ void usage(char *argv) {
}

View File

@ -37,6 +37,7 @@ THE SOFTWARE.
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <signal.h>
#include <math.h>
@ -64,20 +65,35 @@ THE SOFTWARE.
#define MSG_SIZE 65536
#define HASH_BITS 16
#define IDX_ENTRY_SIZE 1024
#define KEY_LEN (IDX_ENTRY_SIZE - 3*(sizeof(int)))
#define KEY_LEN (IDX_ENTRY_SIZE - 2*(sizeof(int)) - sizeof(int64_t))
struct idx { // structure for an index record.
char key[KEY_LEN];
int block_offset; // starting block in the db file.
int length; // db blocks consumed.
int next; // overflow ptr to next index_record on disk.
int64_t next; // overflow ptr to next index_record on disk.
};
struct db_ptr { // a structure that points to a value in the db file.
int block_offset;
int blocks;
int64_t block_offset;
int blocks;
};
struct keydb_column {
char column[KEY_LEN];
struct keydb_column *next;
};
struct keydb_node {
char column[KEY_LEN];
int refcount;
int64_t left;
int64_t right;
int64_t next;
};
@ -105,3 +121,9 @@ void usage(char *argv);
void create_command(char msg[], char response[]);
void read_command(char msg[], char response[]);
void delete_command(char msg[], char response[]);
int keydb_insert(int fd, char column[], int64_t pos, bool go_next);
int keydb_lock(int64_t pos);
int keydb_unlock(int64_t pos);
int composite_insert(int KEYDB_FD, struct keydb_column *tuple);
struct keydb_node* keydb_find(int fd, char *key, int64_t pos);
struct keydb_column* keydb_tree(int fd, int64_t pos);

270
tuple_bits.c Normal file
View File

@ -0,0 +1,270 @@
#include "roxanne_db.h"
int keydb_lock(int64_t pos) {
return 0;
}
int keydb_unlock(int64_t pos) {
return 0;
}
struct keydb_node* key_buf_read(int fd, int64_t pos) {
int64_t n;
struct keydb_node *buffer;
if ((buffer = malloc(sizeof(struct keydb_node))) == NULL) {
perror("Call to malloc() failed in keydb_find.\n");
return NULL;
}
bzero(buffer, sizeof(struct keydb_node));
n = pread(fd, buffer, sizeof(struct keydb_node), pos);
if (n == -1) {
perror("pread() failed in keydb_find.\n");
free(buffer);
return NULL;
}
if (n == 0) { // nothing here. We're at EOF.
free(buffer);
return(NULL);
}
if (buffer->refcount == 0) { // This record is tombstoned.
free(buffer);
return(NULL);
}
return buffer;
}
struct keydb_column* keydb_tree(int fd, int64_t pos) {
// Return a linked list of keys (stored as struct keydb_column)
// found in the tree pointed at by pos. The caller must free the list.
struct keydb_node* parent;
struct keydb_node* buffer;
struct keydb_column *left, *mid, *right;
int64_t next_pos;
int64_t n;
buffer = key_buf_read(fd, pos);
if (buffer == NULL) return(NULL);
// OK, there may really be something here.
if ((mid = malloc(sizeof(struct keydb_column))) == NULL) {
perror("Call to malloc() failed in keydb_tree.\n");
return NULL;
}
memcpy(mid, buffer->column, KEY_LEN);
left = keydb_tree(fd, buffer->left);
right = keydb_tree(fd, buffer->right);
mid->next = right;
left->next = mid;
free(buffer);
return left;
}
struct keydb_node* keydb_find(int fd, char *key, int64_t pos) {
// finds the node that matches key in the tree pointed at by pos.
// returns a pointer that must be freed by the caller.
int64_t n;
struct keydb_node *buffer;
int cmp;
buffer = key_buf_read(fd, pos);
if (buffer == NULL) return NULL;
cmp = strncmp(buffer->column, key, KEY_LEN);
if (cmp == 0) {
return(buffer);
} else if (cmp < 0) { // Go right
pos = buffer->right;
free(buffer);
if (pos != 0) return(keydb_find(fd, key, pos));
} else { // Go left
pos = buffer->left;
free(buffer);
if (pos != 0) return(keydb_find(fd, key, pos));
}
}
int composite_insert(int fd, struct keydb_column *tuple) {
int pos = 0;
pos = keydb_insert(fd, tuple->column, pos, false);
if (pos == -1) return -1;
tuple = tuple->next;
while (tuple) {
pos = keydb_insert(fd, tuple->column, pos, true);
if (pos == -1) return -1;
tuple = tuple->next;
};
return 0;
}
int keydb_insert(int fd, char column[], int64_t pos, bool go_next) {
// inserts a node in the keydb tree. The go_next flag determines
// whether or not the key goes into the binary tree pointed at by
// the record stored at pos, or instead into the 'next' binary
// tree pointed at by the record at pos.
// returns the offset in the file where the insert occurred.
int n;
int comparison;
int64_t next_pos;
struct keydb_node *buffer;
struct stat stat_info;
keydb_lock(pos);
if ((buffer = malloc(sizeof(struct keydb_node))) == NULL) {
perror("Call to malloc() failed in keydb_insert.\n");
keydb_unlock(pos);
return -1;
}
bzero(buffer, sizeof(struct keydb_node));
n = pread(fd, buffer, sizeof(struct keydb_node), pos);
if (n == -1) {
perror("pread() failed in keydb_insert.\n");
keydb_unlock(pos);
free(buffer);
return -1;
}
if (go_next) {
if (n == 0) { // We can't go 'next' on a zero-length file.
keydb_unlock(pos);
fprintf(stderr, "pos is at EOF but we need to read a real record.\n");
free(buffer);
return -1;
}
if (buffer->next == 0) { // create our node and return it's position.
if ((next_pos = lseek(fd, 0, SEEK_END)) == -1) {
perror("lseek failed in keydb_insert\n");
keydb_unlock(pos);
free(buffer);
return -1;
}
keydb_lock(next_pos);
buffer->next = next_pos;
pwrite(fd, buffer, sizeof(struct keydb_node), pos);
bzero(buffer, sizeof(struct keydb_node));
strcpy(buffer->column, column);
buffer->refcount = 1;
pwrite(fd, buffer, sizeof(struct keydb_node), next_pos);
keydb_unlock(next_pos);
free(buffer);
return next_pos;
} else { // insert our node in the tree that next points to.
next_pos = buffer->next;
free(buffer);
keydb_unlock(pos);
return(keydb_insert(fd, column, next_pos, false));
}
}
//Since we're here, We should write our node into this particular tree. (go_next is false).
if (n == 0) { // nothing here. zero-length file. Just write and leave.
memcpy(buffer->column, column, KEY_LEN);
buffer->refcount = 1;
pwrite(fd, buffer, sizeof(struct keydb_node), pos);
keydb_unlock(pos);
free(buffer);
return pos;
}
// Start looking for a place to insert our new node
comparison = strcmp(buffer->column, column);
if (comparison > 0) { // node on disk is bigger, we need to go left.
if (buffer->left != 0) { // go try to insert on the left node.
keydb_unlock(pos);
pos = buffer->left;
free(buffer);
return keydb_insert(fd, column, pos, false);
} else { // There is no left node. Make one.
if ((next_pos = lseek(fd, 0, SEEK_END)) == -1) {
perror("lseek failed in keydb_insert\n");
keydb_unlock(pos);
free(buffer);
return -1;
}
keydb_lock(next_pos);
buffer->left = next_pos;
pwrite(fd, buffer, sizeof(struct keydb_node), pos);
bzero(buffer, sizeof(struct keydb_node));
strcpy(buffer->column, column);
buffer->refcount = 1;
pwrite(fd, buffer, sizeof(struct keydb_node), next_pos);
keydb_unlock(next_pos);
free(buffer);
return next_pos;
}
} else if (comparison < 0) { // node on disk is smaller, we need to go right.
if (buffer->right != 0) { // go try to insert on the right node.
keydb_unlock(pos);
pos = buffer->right;
free(buffer);
return keydb_insert(fd, column, pos, false);
} else { // There is no left node. Make one.
if ((next_pos = lseek(fd, 0, SEEK_END)) == -1) {
perror("lseek failed in keydb_insert\n");
keydb_unlock(pos);
free(buffer);
return -1;
}
keydb_lock(next_pos);
buffer->right = next_pos;
pwrite(fd, buffer, sizeof(struct keydb_node), pos);
bzero(buffer, sizeof(struct keydb_node));
strcpy(buffer->column, column);
buffer->refcount = 1;
pwrite(fd, buffer, sizeof(struct keydb_node), next_pos);
keydb_unlock(next_pos);
free(buffer);
return next_pos;
}
} else { // we match the node here. Simply up the refcount.
buffer->refcount++;
pwrite(fd, buffer, sizeof(struct keydb_node), pos);
keydb_unlock(pos);
free(buffer);
return pos;
}
return 0;
}