Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 153 additions & 117 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,81 +107,32 @@ impl<R: Repo, T> Generic<R, T> {

/// Update the current epoch.
///
/// Calls [`Self::commit`] to flush all data of the previous epoch, and
/// returns the result.
///
/// Does nothing if the given `epoch` is equal to the current epoch.
///
/// # Errors
///
/// If `epoch` is smaller than the current epoch, an error of kind
/// [`io::ErrorKind::InvalidInput`] is returned.
///
/// Also see [`Self::commit`].
pub fn set_epoch(&mut self, epoch: u64) -> io::Result<Option<Committed>> {
use std::cmp::Ordering::*;

match epoch.cmp(&self.head.epoch()) {
Less => Err(io::Error::new(
pub fn set_epoch(&mut self, epoch: u64) -> io::Result<()> {
if epoch < self.head.epoch() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"new epoch is smaller than current epoch",
)),
Equal => Ok(None),
Greater => {
let res = self.commit()?;
self.head.set_epoch(epoch);
Ok(res)
}
));
}
}

/// Write the currently buffered data to storage and rotate segments as
/// necessary.
///
/// Note that this does not imply that the data is durable, in particular
/// when a filesystem storage backend is used. Call [`Self::sync`] to flush
/// any OS buffers to stable storage.
///
/// # Errors
///
/// If an error occurs writing the data, the current [`Commit`] buffer is
/// retained, but a new segment is created. Retrying in case of an `Err`
/// return value thus will write the current data to that new segment.
///
/// If this fails, however, the next attempt to create a new segment will
/// fail with [`io::ErrorKind::AlreadyExists`]. Encountering this error kind
/// this means that something is seriously wrong underlying storage, and the
/// caller should stop writing to the log.
pub fn commit(&mut self) -> io::Result<Option<Committed>> {
self.panicked = true;
let writer = &mut self.head;
let sz = writer.commit.encoded_len();
// If the segment is empty, but the commit exceeds the max size,
// we got a huge commit which needs to be written even if that
// results in a huge segment.
let should_rotate = !writer.is_empty() && writer.len() + sz as u64 > self.opts.max_segment_size;
let writer = if should_rotate {
self.sync();
self.start_new_segment()?
} else {
writer
};

let ret = writer.commit().or_else(|e| {
warn!("Commit failed: {e}");
// Nb.: Don't risk a panic by calling `self.sync()`.
// We already gave up on the last commit, and will retry it next time.
self.start_new_segment()?;
Err(e)
});
self.panicked = false;
ret
self.head.set_epoch(epoch);
Ok(())
}

/// Force the currently active segment to be flushed to storage.
///
/// Using a filesystem backend, this means to call `fsync(2)`.
///
/// **Note** that this does not flush the buffered data from calls to
/// [Self::commit], it only instructs the underlying storage to flush its
/// buffers. Call [Self::flush] prior to this method to ensure data from
/// all previous [Self::commit] calls is flushed to the underlying storage.
///
/// # Panics
///
/// As an `fsync` failure leaves a file in a more of less undefined state,
Expand All @@ -195,6 +146,22 @@ impl<R: Repo, T> Generic<R, T> {
self.panicked = false;
}

/// Flush the buffered data from previous calls to [Self::commit] to the
/// underlying storage.
///
/// Call [Self::sync] to instruct the underlying storage to flush its
/// buffers as well.
pub fn flush(&mut self) -> io::Result<()> {
self.head.flush()
}

/// Calls [Self::flush] and then [Self::sync].
fn flush_and_sync(&mut self) -> io::Result<()> {
self.flush()?;
self.sync();
Ok(())
}

/// The last transaction offset written to disk, or `None` if nothing has
/// been written yet.
///
Expand Down Expand Up @@ -303,8 +270,63 @@ impl<R: Repo, T> Generic<R, T> {
}

impl<R: Repo, T: Encode> Generic<R, T> {
pub fn append(&mut self, record: T) -> Result<(), T> {
self.head.append(record)
/// Write `transactions` to the log.
///
/// This will store all `transactions` as a single [Commit]
/// (note that `transactions` must not yield more than [u16::MAX] elements).
///
/// Data is buffered by the underlying segment [Writer].
/// Call [Self::flush] to force flushing to the underlying storage.
///
/// If, after writing the transactions, the writer's total written bytes
/// exceed [Options::max_segment_size], the current segment is flushed,
/// `fsync`ed and closed, and a new segment is created.
///
/// Returns `Ok(None)` if `transactions` was empty, otherwise [Committed],
/// which contains the offset range and checksum of the commit.
///
/// Note that supplying empty `transactions` may cause the current segment
/// to be rotated.
///
/// # Errors
///
/// An `Err` value is returned in the following cases:
///
/// - if the transaction sequence is invalid, e.g. because the transaction
/// offsets are not contiguous.
///
/// In this case, **none** of the `transactions` will be written.
///
/// - if creating the new segment fails due to an I/O error.
///
/// # Panics
///
/// The method panics if:
///
/// - `transactions` exceeds [u16::MAX] elements
///
/// - [Self::flush] or writing to the underlying [Writer] fails
///
/// This is likely caused by some storage issue. As we cannot tell with
/// certainty how much data (if any) has been written, the internal state
/// becomes invalid and thus a panic is raised.
///
/// - [Self::sync] panics (called when rotating segments)
pub fn commit<U: Into<Transaction<T>>>(
&mut self,
transactions: impl IntoIterator<Item = U>,
) -> io::Result<Option<Committed>> {
self.panicked = true;
let writer = &mut self.head;
let committed = writer.commit(transactions)?;
if writer.len() >= self.opts.max_segment_size {
self.flush().expect("failed to flush segment");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seemed a bit surprising to me at first -- but the BufWriter has no way of knowing how many bytes did make it. So if flush fails, the buffer is basically garbage.

self.sync();
self.start_new_segment()?;
}
self.panicked = false;

Ok(committed)
}

pub fn transactions_from<'a, D>(
Expand Down Expand Up @@ -348,8 +370,8 @@ impl<R: Repo, T: Encode> Generic<R, T> {
impl<R: Repo, T> Drop for Generic<R, T> {
fn drop(&mut self) {
if !self.panicked {
if let Err(e) = self.head.commit() {
warn!("failed to commit on drop: {e}");
if let Err(e) = self.flush_and_sync() {
warn!("failed to flush on drop: {e:#}");
}
}
}
Expand Down Expand Up @@ -920,7 +942,7 @@ fn range_is_empty(range: &impl RangeBounds<u64>) -> bool {

#[cfg(test)]
mod tests {
use std::{cell::Cell, iter::repeat, num::NonZeroU16};
use std::{cell::Cell, iter::repeat};

use pretty_assertions::assert_matches;

Expand All @@ -933,30 +955,31 @@ mod tests {
#[test]
fn rotate_segments_simple() {
let mut log = mem_log::<[u8; 32]>(128);
for _ in 0..3 {
log.append([0; 32]).unwrap();
log.commit().unwrap();
for i in 0..4 {
log.commit([(i, [0; 32])]).unwrap();
}
log.flush_and_sync().unwrap();

let offsets = log.repo.existing_offsets().unwrap();
assert_eq!(&offsets[..offsets.len() - 1], &log.tail);
assert_eq!(offsets[offsets.len() - 1], 2);
// TODO: We overshoot the max segment size.
assert_eq!(&offsets, &[0, 3]);
}

#[test]
fn huge_commit() {
let mut log = mem_log::<[u8; 32]>(32);

log.append([0; 32]).unwrap();
log.append([1; 32]).unwrap();
log.commit().unwrap();
assert!(log.head.len() > log.opts.max_segment_size);
log.commit([(0, [0; 32]), (1, [1; 32])]).unwrap();
log.flush_and_sync().unwrap();
// First segment got rotated out.
assert_eq!(&log.tail, &[0]);

log.append([2; 32]).unwrap();
log.commit().unwrap();
log.commit([(2, [2; 32])]).unwrap();
log.flush_and_sync().unwrap();

assert_eq!(&log.tail, &[0]);
assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2]);
// Second segment got rotated out and segment 3 is created.
assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2, 3]);
}

#[test]
Expand Down Expand Up @@ -1052,14 +1075,31 @@ mod tests {
fn traverse_commits_ignores_duplicates() {
let mut log = mem_log::<[u8; 32]>(1024);

log.append([42; 32]).unwrap();
let commit1 = log.head.commit.clone();
log.commit().unwrap();
log.head.commit = commit1.clone();
log.commit().unwrap();
log.append([43; 32]).unwrap();
let commit2 = log.head.commit.clone();
log.commit().unwrap();
let tx1 = [42u8; 32];
let tx2 = [43u8; 32];

log.commit([(0, tx1)]).unwrap();
let commit1 = Commit {
min_tx_offset: 0,
n: 1,
records: tx1.to_vec(),
..log.head.commit.clone()
};

// Reset the commit offset, so we can write the same commit twice.
log.head.commit.min_tx_offset = 0;
log.commit([(0, tx1)]).unwrap();

// Write another one.
log.commit([(1, tx2)]).unwrap();
let commit2 = Commit {
min_tx_offset: 1,
n: 1,
records: tx2.to_vec(),
..log.head.commit.clone()
};

log.flush_and_sync().unwrap();

assert_eq!(
[commit1, commit2].as_slice(),
Expand All @@ -1074,15 +1114,14 @@ mod tests {
fn traverse_commits_errors_when_forked() {
let mut log = mem_log::<[u8; 32]>(1024);

log.append([42; 32]).unwrap();
log.commit().unwrap();
log.head.commit = Commit {
min_tx_offset: 0,
n: 1,
records: [43; 32].to_vec(),
epoch: 0,
};
log.commit().unwrap();
log.commit([(0, [42; 32])]).unwrap();
// Reset the commit offset,
// and write a different commit at the same offset.
// This is considered a fork.
log.head.commit.min_tx_offset = 0;
log.commit([(0, [43; 32])]).unwrap();

log.flush_and_sync().unwrap();

let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
assert!(
Expand All @@ -1095,11 +1134,11 @@ mod tests {
fn traverse_commits_errors_when_offset_not_contiguous() {
let mut log = mem_log::<[u8; 32]>(1024);

log.append([42; 32]).unwrap();
log.commit().unwrap();
log.commit([(0, [42; 32])]).unwrap();
log.head.commit.min_tx_offset = 18;
log.append([42; 32]).unwrap();
log.commit().unwrap();
log.commit([(18, [42; 32])]).unwrap();

log.flush_and_sync().unwrap();

let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
assert!(
Expand All @@ -1111,7 +1150,7 @@ mod tests {
prev_error: None
})
),
"expected fork error: {res:?}"
"expected out-of-order error: {res:?}"
)
}

Expand Down Expand Up @@ -1221,7 +1260,7 @@ mod tests {
#[test]
fn reopen() {
let mut log = mem_log::<[u8; 32]>(1024);
let mut total_txs = fill_log(&mut log, 100, (1..=10).cycle());
let total_txs = fill_log(&mut log, 100, (1..=10).cycle());
assert_eq!(
total_txs,
log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count()
Expand All @@ -1231,12 +1270,11 @@ mod tests {
log.repo.clone(),
Options {
max_segment_size: 1024,
max_records_in_commit: NonZeroU16::new(10).unwrap(),
..Options::default()
},
)
.unwrap();
total_txs += fill_log(&mut log, 100, (1..=10).cycle());
let total_txs = fill_log(&mut log, 100, (1..=10).cycle());

assert_eq!(
total_txs,
Expand All @@ -1245,24 +1283,22 @@ mod tests {
}

#[test]
fn set_same_epoch_does_nothing() {
fn set_new_epoch() {
let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap();
assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap();
assert_eq!(committed, None);
}

#[test]
fn set_new_epoch_commits() {
let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap();
assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
log.append(<_>::default()).unwrap();
let committed = log
.set_epoch(42)
.unwrap()
.expect("should have committed the pending transaction");
log.commit([(0, [12; 32])]).unwrap();
log.set_epoch(42).unwrap();
assert_eq!(log.epoch(), 42);
assert_eq!(committed.tx_range.start, 0);
log.commit([(1, [13; 32])]).unwrap();

log.flush_and_sync().unwrap();

let epochs = log
.commits_from(0)
.map(Result::unwrap)
.map(|commit| commit.epoch)
.collect::<Vec<_>>();
assert_eq!(&[Commit::DEFAULT_EPOCH, 42], epochs.as_slice());
}

#[test]
Expand Down
Loading
Loading