Commit some work in progress thoughts around the checkpoint and follower module

This commit is contained in:
R Tyler Croy 2019-11-18 19:00:44 -08:00
parent 4ca3c315e8
commit 3e0924820d
No known key found for this signature in database
GPG Key ID: E5C92681BEF6CEA2
4 changed files with 114 additions and 0 deletions

View File

@ -1,4 +1,6 @@
/*
* Copyright 2019 (c) R. Tyler Croy <rtyler@brokenco.de>
*
* This module contains the database interaction code necessary for storing
* checkpoints in our local SQLite database
*/

90
src/follower.rs Normal file
View File

@ -0,0 +1,90 @@
/*
* Copyright 2019 (c) R. Tyler Croy <rtyler@brokenco.de>
*
* The follower module contains all the necessary code for following files, up to and including the
* storing of checkpoints and so on.
*
*
* Portions of this code were adapted from Timothy Bess' work in a Rust tail implementation which
* was copyrighted in 2018 for <tdbgamer@gmail.com> and can be found:
* <https://github.com/tdbgamer/Tail> and was licensed under the ASF 2.0 license.
*/
use std::fs;
use std::fs::{File, Metadata};
use std::io::{Seek, BufReader, SeekFrom};
use std::path::{Path, PathBuf};
#[derive(Debug, PartialEq)]
enum ModificationType {
Added,
Removed,
Rotated,
Truncated,
NoChange,
}
/**
* The FileFollower struct contains the necessary properties to follow the output of a file as new
* buffers are written to it.
*
*/
struct FileFollower {
fd: BufReader<File>,
/**
* The metadata will contain the inode (ino()), filesize (len()), and any other properties
* which can be used to checkpoint or handle file changes
*/
metadata: Metadata,
/// The cursor will be reset if the file appears to have changed
cursor: SeekFrom,
path: PathBuf,
// vector of buffers needed to store messages between sending interval
}
impl FileFollower {
pub fn new(fd: File, file_name: String) -> Self {
FileFollower {
metadata: fd.metadata().unwrap(),
fd: BufReader::new(fd),
cursor: SeekFrom::Start(0),
path: Path::new(&file_name).to_path_buf(),
}
}
/**
* Return the ModificationType depending on the differences determined when examining a file's
* older Metadata and its current Metadata
*/
fn modification_between(old: Metadata, current: Metadata) -> ModificationType {
ModificationType::NoChange
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_no_modification() {
let old = fs::metadata(file!()).unwrap();
let current = fs::metadata(file!()).unwrap();
let mod_type = FileFollower::modification_between(old, current);
assert_eq!(mod_type, ModificationType::NoChange);
}
#[test]
fn test_rotated_modification() {
/*
* When the old and current metadata have different inodes, we can safely assume that the
* file has been rotated
*/
let old = fs::metadata(file!()).unwrap();
let current = fs::metadata(file!()).unwrap();
let mod_type = FileFollower::modification_between(old, current);
assert_eq!(mod_type, ModificationType::Rotated);
}
}

View File

@ -1,3 +1,9 @@
/*
* Copyright 2019 (c) R. Tyler Croy <rtyler@brokenco.de>
*
* Main entrypoint for filekäfer, mostly does argument processing and kicks off the runloop.
*/
extern crate clap;
extern crate futures;
extern crate inotify;
@ -20,6 +26,8 @@ mod config;
mod kafka;
mod tail;
mod checkpoints;
mod models;
mod follower;
use kafka::Kafka;
fn main() -> std::io::Result<()> {

14
src/models.rs Normal file
View File

@ -0,0 +1,14 @@
/*
* Copyright 2019 (c) R. Tyler Croy <rtyler@brokenco.de>
*
* Models are serializable structs to pass in and out of Diesel
*/
#[derive(Queryable, Debug)]
pub struct Checkpoint {
pub id: i64,
pub filepath: String,
pub filesize: i64,
pub inode: i64,
pub offset_bytes: i64,
}