Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
6954121
Added basic test skeleton for simple key-value store
Apr 5, 2018
2d23c61
Basic key value store for testing
Apr 5, 2018
c77bb90
Removed swap file
Apr 5, 2018
1683a4c
Added very basic client test, passes
Apr 7, 2018
b5715c7
Removed sending client requests without session
edauterman Apr 7, 2018
0f5fc98
Refactored client library to allow for easier integration with RIFL
edauterman Apr 7, 2018
caf5b29
Revert "Refactored client library to allow for easier integration wit…
edauterman Apr 7, 2018
0c351f9
Removed more unnecessary code from session.go and removed
edauterman Apr 7, 2018
4aae0a4
fixes to run with modifications to session.go
edauterman Apr 7, 2018
624c410
removed leases from client library
edauterman Apr 7, 2018
716f9b5
client id rpc format
edauterman Apr 9, 2018
366185b
Basic mechanism for getting next client ID and replicating it to
edauterman Apr 12, 2018
5faa13c
Get client ID when starting a session
edauterman Apr 12, 2018
ec1f079
Basic client IDs working (minimal testing)
edauterman Apr 12, 2018
8bfdb9e
Added snapshotting for next client id
edauterman Apr 12, 2018
5e98402
Framework for map<clientID,map<seqNo,response>>
edauterman Apr 13, 2018
dd37120
Populate tables with results of Client RPCs
edauterman Apr 13, 2018
305739c
Fixed bug and locking
edauterman Apr 13, 2018
7567638
Added support for snapshotting clientResponseCache (basic sanity check
edauterman Apr 14, 2018
515b83d
Garbage collect entries in client response cache
edauterman Apr 14, 2018
f9f8bd0
Added basic RIFL tests
edauterman Apr 18, 2018
8c58fa0
Basic RIFL test passing
edauterman Apr 18, 2018
d30c684
forgot to add file to commit
edauterman Apr 18, 2018
0fbc7b5
Adding GC tests and shell script
edauterman Apr 19, 2018
e895df1
Tests cleaned up and all passing
edauterman Apr 19, 2018
f4d213c
Adding framework for restarting a cluster
edauterman Apr 19, 2018
502f323
Correctly restarting cluster
edauterman Apr 19, 2018
5cbffcb
All tests done and passing
edauterman Apr 19, 2018
745ebe3
Added sanity check to test script, forgot to add file before
edauterman Apr 20, 2018
a18c902
renamed file
edauterman Apr 20, 2018
4fb7620
Mark client RPCs with keys
edauterman Apr 28, 2018
df7fcfd
Added sync and record RPC types
edauterman Apr 28, 2018
54875ef
Storing record RPCs at witnesses
edauterman Apr 28, 2018
37d2157
Change client sessions to keep open connections with all raft servers
edauterman Apr 28, 2018
12724b7
Locking for sending requests in parallel, framework for sending to
edauterman Apr 28, 2018
84e3101
Parallelize sending records to witnesses
edauterman Apr 29, 2018
277ba38
fixes to sending to witnesses in parallel and design doc for RIFL
edauterman May 4, 2018
5d1e37d
fix markdown titles
edauterman May 4, 2018
cd3cba5
design doc updates
edauterman May 4, 2018
2cc9951
design doc updates
edauterman May 4, 2018
459eb64
refactor to not have redundant fields in client request
edauterman May 5, 2018
d4df114
support for applying client command locally
edauterman May 5, 2018
876c817
Fixes for framework to integrate CURP with other tests
edauterman May 5, 2018
9ad7f5a
very basic curp without sync working, race condition with deadlock
edauterman May 5, 2018
2ca4d4d
Sync implemented
edauterman May 6, 2018
a042b3f
design doc updates
edauterman May 7, 2018
530eaf0
fix
edauterman May 7, 2018
e2e3971
Comments for session.go
edauterman May 10, 2018
f7b01a2
formatting
edauterman May 10, 2018
48cf92f
Comments for fsm.go, raft.go
edauterman May 10, 2018
4864d36
Comments for commands.go
edauterman May 10, 2018
50ddbb9
adding diff
edauterman May 10, 2018
54d4d45
Comments for tests
edauterman May 10, 2018
1b816a3
design doc updates
edauterman May 10, 2018
94938a0
bug fixes
edauterman May 13, 2018
1e5d041
Added tests for commutative and non-commutative operations
edauterman May 13, 2018
6d2a439
comment
edauterman May 13, 2018
0b5781c
Write witness state to stable storage
edauterman May 14, 2018
1723e5b
start recovery from witness
edauterman May 17, 2018
0fff1aa
format
edauterman May 17, 2018
b5ca44a
first part of recovery from witness done (leader done, not witness)
edauterman May 20, 2018
5d56f25
handling recovery data request and unfreezing at witness
edauterman May 20, 2018
68fa36f
Check term number to avoid stale witness set
edauterman May 25, 2018
bab10bc
send to superquorum of witnesses
edauterman May 27, 2018
a1f4a86
recover operations committed in majority of f+1 witnesses
edauterman May 27, 2018
d4a62ec
test updates
edauterman May 30, 2018
ed5e562
test for leader recovery
edauterman Jun 4, 2018
74e3621
stable store instead of in memory store for benchmarking
edauterman Jun 10, 2018
30516b6
restructure directories
edauterman Jun 10, 2018
029ed58
benchmark client
edauterman Jun 10, 2018
9238922
remove binary
edauterman Jun 10, 2018
b82eba4
start single raft node
edauterman Jun 10, 2018
5a97309
initial test script
edauterman Jun 12, 2018
6fd06a0
scripts working for latency and throughput measurements
Jun 12, 2018
c94ea90
produce preliminary cdf
Jun 12, 2018
5aa24fb
produce cdfs for 100%, 95%, and 90% commutative
Jun 12, 2018
befa8af
Throughput vs latency graph generation
edauterman Jun 13, 2018
47dca81
script fixes
Jun 13, 2018
e8dabf5
fix to latency v throughput
Jun 14, 2018
16d39bf
silence logging messages for testing
Jun 14, 2018
191ff6d
Add testing documentation and scripts to generate throughput vs laten…
Jun 14, 2018
1802fa4
remove threading from client.go
edauterman Jun 14, 2018
2c91167
removed writing to stable store
edauterman Jul 22, 2018
9595658
updated benchmark to not use stable store
edauterman Aug 12, 2018
c8cc8c9
adding graphs
Aug 12, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file modified .gitignore
100644 → 100755
Empty file.
Empty file modified .travis.yml
100644 → 100755
Empty file.
45 changes: 45 additions & 0 deletions DESIGN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
## CURP

### Completed for CURP
* Record and sync RPCs.
* Keys sent with client requests to track commutativity in client operations.
* Accept records only if operations stored in witnesses don't commute.
* Master tries to apply command only locally if commutative. If not commutative, replicates synchronously and responds that it synced.
* Master synchronously replicates commands sent in Sync RPCs.
* GC records at witnesses when done applying.
* Send to witnesses and master in parallel, check for success or sync. If failure, send sync to master.

### CURP Code Base
* `raft.go`: Witness state defined. Garbage collect at witnesses when operation completed. Support for handling record requests: accept and record if keys commutative and not leader, reject otherwise. Master syncs if operation not commutative, support for sync operation at master.
* `commands.go`: Sync and Record RPCs, add Synced field to ClientResponse to know if master synced. Add keys to ClientRequests.
* `session.go`: Sending to all witnesses and master in parallel. If all succeeded or synced at master, succeed. Otherwise, send Sync RPC to master. Keep repeating until success.
* `log.go`: Update log entry to contain keys for commutativity checks.
* `api.go`: Add witness state to raft nodes.
* `net_transport.go`: Add new RPC types.

## RIFL

### Completed for RIFL
* Added client IDs and sequence numbers to client RPCs
* Assign client ID at master using global nextClientId
* Replicate nextClientId counter to other servers with LogNextClientId operation
* Store responses to client RPCs in cache that is periodically garbage-collected based on configurable timeout
* Check for duplicate before applying to state machine
* Make nextClientId and cache of client responses persistent.

### RIFL Code Base
* `raft.go`: Support for ClientId RPC handling, incrementing nextClientId at all replicas
* `fsm.go`: Before applying a command locally, check for cached response.
* `client_response_cache.go`: Stores state about the response to a client RPC along with a timestamp. Cache is periodically garbage collected.
* `session.go`: Starting a client session requires getting a new client ID, use that client ID and assign monotonically increasing sequence numbers for client RPCs.
* `commands.go`: RPC format for ClientRequest and ClientResponse updated to contain Client ID and sequence number, new RPC format ClientIdRequest and ClientIdResponse. GenericClientRequest for sending a request to a Raft leader.
* `log.go`: Update log entry to contain client IDs and sequence numbers.
* `config.go`: Set interval at which to garbage collect cache and how long responses to client RPCs should remain cached
* `api.go`: client response cache and next client ID state added to each raft node and snapshot restoring operations.
* `snapshot.go`: Support for snapshotting the client response cache and the next client ID (must be stored persistently).
* `file_snapshot.go`: Support for snapshotting the client response cache and the next client ID.
* `inmem_snapshot.go`: Support for snapshotting the client response cache and the next client ID.
* `net_transport.go`: Add new RPC types.

Run tests for RIFL: `src/test/runTests.sh`
Currently has a race condition
Empty file modified src/raft/.gitignore
100644 → 100755
Empty file.
Empty file modified src/raft/.travis.yml
100644 → 100755
Empty file.
Empty file modified src/raft/LICENSE
100644 → 100755
Empty file.
Empty file modified src/raft/Makefile
100644 → 100755
Empty file.
Empty file modified src/raft/README.md
100644 → 100755
Empty file.
143 changes: 126 additions & 17 deletions src/raft/api.go
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package raft

import (
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -48,6 +49,36 @@ var (
// ErrCantBootstrap is returned when attempt is made to bootstrap a
// cluster that already has state present.
ErrCantBootstrap = errors.New("bootstrap only works on new clusters")

// ErrBadClientId is returned when a client issues a RPC with a client
// ID the cluster doesn't recognize.
ErrBadClientId = errors.New("bad client ID used")

// ErrNotCommutative is returned when a client tries to push an operation
// to a witness that is not commutative with other operations stored at
// the witness.
ErrNotCommutative = errors.New("operation not commutative with operations in witness")

// ErrNotWitness is returned when a client contacts a leader instead of
// a witness.
ErrNotWitness = errors.New("contacted leader instead of witness")

// ErrNoActiveServers is returned when a client tries to contact a cluster
// and cannot reach any servers.
ErrNoActiveServers = errors.New("no active raft servers found")

// ErrNoActiveLeader is returned when a client tries to contact a leader
// and cannot reach an active leader.
ErrNoActiveLeader = errors.New("no active leader found")

// ErrWitnessFrozen is returned when a client tries to record a command
// in a witness that cannot accept client record requests.
ErrWitnessFrozen = errors.New("witness cannot accept record request, frozen")

// ErrStaleTerm is returned when a client tries to record a command in
// a witness using a stale term number, meaning that it is sending the
// command to a potentially stale set of witnesses.
ErrStaleTerm = errors.New("witness cannot accept record request with stale term")
)

// Raft implements a Raft node.
Expand Down Expand Up @@ -81,6 +112,10 @@ type Raft struct {
// fsmSnapshotCh is used to trigger a new snapshot being taken
fsmSnapshotCh chan *reqSnapshotFuture

// True if witness can't accept client record requests, false otherwise.
frozen bool
frozenLock sync.RWMutex

// lastContact is the last time we had contact from the
// leader node. This can be used to gauge staleness.
lastContact time.Time
Expand Down Expand Up @@ -108,13 +143,22 @@ type Raft struct {
// LogStore provides durable storage for logs
logs LogStore

// Cache of client responses. Used for RIFL. Map of ClientIDs to
// map of client RPC sequence numbers to response data. Periodically
// garbage collected.
clientResponseCache map[uint64]map[uint64]clientResponseEntry
clientResponseLock sync.RWMutex

// Used to request the leader to make configuration changes.
configurationChangeCh chan *configurationChangeFuture

// Tracks the latest configuration and latest committed configuration from
// the log/snapshot.
configurations configurations

// Next Client ID to assign to new client. Used for RIFL.
nextClientId uint64

// RPC chan comes from the transport layer
rpcCh <-chan RPC

Expand Down Expand Up @@ -193,6 +237,9 @@ func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
return fmt.Errorf("failed to save current term: %v", err)
}

// Set empty maps for witness state
stableSetWitnessState(stable, make(map[ClientSeqNo]Log), make(map[uint32]Key))

// Append configuration entry to log.
entry := &Log{
Index: 1,
Expand Down Expand Up @@ -268,6 +315,8 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
// Attempt to restore any snapshots we find, newest to oldest.
var snapshotIndex uint64
var snapshotTerm uint64
var snapshotClientId uint64
var snapshotClientResponseCache map[uint64]map[uint64]clientResponseEntry
snapshots, err := snaps.List()
if err != nil {
return fmt.Errorf("failed to list snapshots: %v", err)
Expand All @@ -288,6 +337,8 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,

snapshotIndex = snapshot.Index
snapshotTerm = snapshot.Term
snapshotClientId = snapshot.NextClientId
snapshotClientResponseCache = snapshot.ClientResponseCache
break
}
if len(snapshots) > 0 && (snapshotIndex == 0 || snapshotTerm == 0) {
Expand All @@ -298,6 +349,8 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
// until we play back the Raft log entries.
lastIndex := snapshotIndex
lastTerm := snapshotTerm
lastClientId := snapshotClientId
lastClientResponseCache := snapshotClientResponseCache

// Apply any Raft log entries past the snapshot.
lastLogIndex, err := logs.LastIndex()
Expand All @@ -310,7 +363,25 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
return fmt.Errorf("failed to get log at index %d: %v", index, err)
}
if entry.Type == LogCommand {
_,_ = fsm.Apply(&entry)
resp := fsm.Apply(&entry)
data, err := json.Marshal(resp)
if err != nil {
return fmt.Errorf("failed to marshal response to command at index %d: %v", index, err)
}
clientCache, ok := lastClientResponseCache[entry.ClientID]
if !ok {
clientCache = make(map[uint64]clientResponseEntry)
}
clientCache[entry.SeqNo] = clientResponseEntry{
response: data,
timestamp: time.Now(), // will be garbage collected later
}
lastClientResponseCache[entry.ClientID] = clientCache
}
if entry.Type == LogNextClientId {
if err := decodeMsgPack(entry.Data, &lastClientId); err != nil {
panic(fmt.Errorf("failed to decode next cliend id: %v", err))
}
}
lastIndex = entry.Index
lastTerm = entry.Term
Expand All @@ -323,7 +394,7 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
return fmt.Errorf("failed to snapshot FSM: %v", err)
}
version := getSnapshotVersion(conf.ProtocolVersion)
sink, err := snaps.Create(version, lastIndex, lastTerm, configuration, 1, trans)
sink, err := snaps.Create(version, lastIndex, lastTerm, configuration, 1, lastClientId, lastClientResponseCache, trans)
if err != nil {
return fmt.Errorf("failed to create snapshot: %v", err)
}
Expand Down Expand Up @@ -399,6 +470,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
logger = conf.Logger
} else {
if conf.LogOutput == nil {
//devNull = open(os.devnull, 'w')
conf.LogOutput = os.Stderr
}
logger = log.New(conf.LogOutput, "", log.LstdFlags)
Expand Down Expand Up @@ -437,19 +509,22 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna

// Create Raft struct.
r := &Raft{
protocolVersion: protocolVersion,
applyCh: make(chan *logFuture),
conf: *conf,
fsm: fsm,
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
localID: localID,
localAddr: localAddr,
logger: logger,
logs: logs,
protocolVersion: protocolVersion,
applyCh: make(chan *logFuture),
conf: *conf,
clientResponseCache: make(map[uint64]map[uint64]clientResponseEntry),
frozen: false,
fsm: fsm,
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
localID: localID,
localAddr: localAddr,
logger: logger,
logs: logs,
configurationChangeCh: make(chan *configurationChangeFuture),
configurations: configurations{},
nextClientId: 0,
rpcCh: trans.Consumer(),
snapshots: snaps,
userSnapshotCh: make(chan *userSnapshotFuture),
Expand All @@ -461,7 +536,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
configurationsCh: make(chan *configurationsFuture, 8),
bootstrapCh: make(chan *bootstrapFuture),
observers: make(map[uint64]*Observer),
}
}

// Initialize as a follower.
r.setState(Follower)
Expand Down Expand Up @@ -505,6 +580,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
r.goFunc(r.run)
r.goFunc(r.runFSM)
r.goFunc(r.runSnapshots)
r.goFunc(r.runGcClientResponseCache)
return r, nil
}

Expand Down Expand Up @@ -599,18 +675,46 @@ func (r *Raft) Leader() ServerAddress {
// An optional timeout can be provided to limit the amount of time we wait
// for the command to be started. This must be run on the leader or it
// will fail.
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {
func (r *Raft) Apply(log *Log, timeout time.Duration) ApplyFuture {
metrics.IncrCounter([]string{"raft", "apply"}, 1)
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
}

// Create a log future, no index or term yet
logFuture := &logFuture{
log: *log,
}
logFuture.init()

select {
case <-timer:
return errorFuture{ErrEnqueueTimeout}
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.applyCh <- logFuture:
return logFuture
}
}

// Updates all Raft nodes with the value of NextClientId at the leader.
// This must be run at the leader.
func (r *Raft) SendNextClientId(timeout time.Duration) Future {
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
}

buf, err := encodeMsgPack(r.nextClientId)
if err != nil {
panic(fmt.Errorf("failed to encode next client id: %v", err))
}

logFuture := &logFuture{
log: Log{
Type: LogCommand,
Data: cmd,
Type: LogNextClientId,
Data: buf.Bytes(),
},
}
logFuture.init()
Expand Down Expand Up @@ -1006,3 +1110,8 @@ func (r *Raft) LastIndex() uint64 {
func (r *Raft) AppliedIndex() uint64 {
return r.getLastApplied()
}

// Checks if raft node is the current leader.
func (r *Raft) IsLeader() bool {
return r.getState() == Leader
}
Empty file modified src/raft/bench/bench.go
100644 → 100755
Empty file.
47 changes: 47 additions & 0 deletions src/raft/client_response_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package raft

import (
"time"
)

// Manages the cache of client responses for use in RIFL, including
// garbage collecting the cache.

// clientResponseEntry holds state about the response to a client RPC.
// For use in RIFL.
type clientResponseEntry struct {
response interface{}
timestamp time.Time
}

// Continuously check to garbage collect the cache.
func (r *Raft) runGcClientResponseCache() {
for {
select {
case <-randomTimeout(r.conf.ClientResponseGcInterval):
r.gcClientResponseCache()

case <-r.shutdownCh:
return
}
}
}

// Garbage collect entries in the cache that have expired.
func (r *Raft) gcClientResponseCache() {
r.clientResponseLock.RLock()
currTime := time.Now()
for clientID, clientCache := range r.clientResponseCache {
for seqNo, entry := range clientCache {
if currTime.Sub(entry.timestamp) >= r.conf.ClientResponseGcRemoveTime {
r.clientResponseLock.RUnlock()
r.clientResponseLock.Lock()
delete(clientCache, seqNo) // does nothing if key does not exist, no race condition
r.clientResponseLock.Unlock()
r.clientResponseLock.RLock()
}
}
r.clientResponseCache[clientID] = clientCache
}
r.clientResponseLock.RUnlock()
}
Loading