吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 721|回复: 1
收起左侧

[求助] 6.824(6.5840) 6.5840 Lab 3: Raft

[复制链接]
红月亮 发表于 2024-8-4 11:03
66吾爱币
本帖最后由 红月亮 于 2024-8-5 13:31 编辑

6.5840 https://pdos.csail.mit.edu/6.824/labs/lab-raft.html
partB的日志复制无法通过测试,代码有什么可以改进的地方?

github地址:https://github.com/2451965602/6.5840

[Golang] 纯文本查看 复制代码
package raft

//
// this is an outline of the API that raft must expose to
// the service (or tester). see comments below for
// each of these functions for more details.
//
// rf = Make(...)
//   create a new Raft server.
// rf.Start(command interface{}) (index, term, isleader)
//   start agreement on a new log entry
// rf.GetState() (term, isLeader)
//   ask a Raft for its current term, and whether it thinks it is leader
// ApplyMsg
//   each time a new entry is committed to the log, each Raft peer
//   should send an ApplyMsg to the service (or tester)
//   in the same server.
//

import (
        "github.com/sasha-s/go-deadlock"
        //        "bytes"
        "math/rand"
        "sync"
        "sync/atomic"
        "time"

        //        "6.5840/labgob"
        "6.5840/labrpc"
)

// as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
//
// in part 3D you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh, but set CommandValid to false for these
// other uses.
type ApplyMsg struct {
        CommandValid bool
        Command      interface{}
        CommandIndex int

        // For 3D:
        SnapshotValid bool
        Snapshot      []byte
        SnapshotTerm  int
        SnapshotIndex int
}

type Entry struct {
        Term    int
        Command interface{}
        Index   int
}

// A Go object implementing a single Raft peer.
type Raft struct {
        mu        deadlock.Mutex      // Lock to protect shared access to this peer's state
        peers     []*labrpc.ClientEnd // RPC end points of all peers
        persister *Persister          // Object to hold this peer's persisted state
        me        int                 // this peer's index into peers[]
        dead      int32               // set by Kill()

        // Your data here (3A, 3B, 3C).
        state           string
        applyCh         chan ApplyMsg
        lastLeaderCheck time.Time

        currentTerm int
        votedFor    int
        logs        []Entry
        commitIndex int
        lastApplied int
        nextIndex   []int
        matchIndex  []int

        voteCount int
        // Look at the paper's Figure 2 for a description of what
        // state a Raft server must maintain.

}

// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {

        var term int
        var isleader bool
        // Your code here (3A).

        term = rf.currentTerm
        isleader = rf.state == "Leader"

        //DPrintf("GetState: term=%d,me=%d,isleader=%v,state:%v", term, rf.me, isleader, rf.state)
        return term, isleader
}

// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
// before you've implemented snapshots, you should pass nil as the
// second argument to persister.Save().
// after you've implemented snapshots, pass the current snapshot
// (or nil if there's not yet a snapshot).
func (rf *Raft) persist() {
        // Your code here (3C).
        // Example:
        // w := new(bytes.Buffer)
        // e := labgob.NewEncoder(w)
        // e.Encode(rf.xxx)
        // e.Encode(rf.yyy)
        // raftstate := w.Bytes()
        // rf.persister.Save(raftstate, nil)
}

// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
        if data == nil || len(data) < 1 { // bootstrap without any state?
                return
        }
        // Your code here (3C).
        // Example:
        // r := bytes.NewBuffer(data)
        // d := labgob.NewDecoder(r)
        // var xxx
        // var yyy
        // if d.Decode(&xxx) != nil ||
        //    d.Decode(&yyy) != nil {
        //   error...
        // } else {
        //   rf.xxx = xxx
        //   rf.yyy = yyy
        // }
}

// the service says it has created a snapshot that has
// all info up to and including index. this means the
// service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible.
func (rf *Raft) Snapshot(index int, snapshot []byte) {
        // Your code here (3D).

}

// example RequestVote RPC arguments structure.
// field names must start with capital letters!
type RequestVoteArgs struct {
        // Your data here (3A, 3B).
        Term         int
        CandidateId  int
        LastLogIndex int
        LastLogTerm  int
}

// example RequestVote RPC reply structure.
// field names must start with capital letters!
type RequestVoteReply struct {
        // Your data here (3A).
        VoteGranted bool
        Term        int
}

type AppendEntriesArgs struct {
        NewCommand   bool
        Term         int
        LeaderId     int
        PrevLogIndex int
        PrevLogTerm  int
        Logs         []Entry

        LeaderCommit int
}

type AppendEntriesReply struct {
        Term    int
        Success bool
}

// example RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
        // Your code here (3A, 3B).
        rf.mu.Lock()
        defer rf.mu.Unlock()

        if rf.killed() {
                reply.Term = -1
                reply.VoteGranted = false
                return
        }

        if args.Term < rf.currentTerm {
                reply.Term = rf.currentTerm
                reply.VoteGranted = false
                return
        }

        if args.Term > rf.currentTerm {
                rf.currentTerm = args.Term
                rf.state = "Follower"
                rf.votedFor = -1
        }

        lastLogIndex := len(rf.logs) - 1
        lastLogTerm := 0
        if lastLogIndex >= 0 {
                lastLogTerm = rf.logs[lastLogIndex].Term
        }

        if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && (args.LastLogTerm > lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) {
                reply.VoteGranted = true
                rf.votedFor = args.CandidateId
        } else {
                reply.VoteGranted = false
        }
        reply.Term = rf.currentTerm

}

// example code to send a RequestVote RPC to a server.
// server is the index of the target server in rf.peers[].
// expects RPC arguments in args.
// fills in *reply with RPC reply, so caller should
// pass &reply.
// the types of the args and reply passed to Call() must be
// the same as the types of the arguments declared in the
// handler function (including whether they are pointers).
//
// The labrpc package simulates a lossy network, in which servers
// may be unreachable, and in which requests and replies may be lost.
// Call() sends a request and waits for a reply. If a reply arrives
// within a timeout interval, Call() returns true; otherwise
// Call() returns false. Thus Call() may not return for a while.
// A false return can be caused by a dead server, a live server that
// can't be reached, a lost request, or a lost reply.
//
// Call() is guaranteed to return (perhaps after a delay) *except* if the
// handler function on the server side does not return.  Thus there
// is no need to implement your own timeouts around Call().
//
// look at the comments in ../labrpc/labrpc.go for more details.
//
// if you're having trouble getting RPC to work, check that you've
// capitalized all field names in structs passed over RPC, and
// that the caller passes the address of the reply struct with &, not
// the struct itself.
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
        ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
        return ok
}

// the service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election. even if the Raft instance has been killed,
// this function should return gracefully.
//
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
func (rf *Raft) Start(command interface{}) (int, int, bool) {
        index := -1
        term := -1
        isLeader := rf.state == "Leader"

        // Your code here (3B).

        if isLeader {
                rf.mu.Lock()
                index = len(rf.logs)
                term = rf.currentTerm
                log := Entry{Term: term, Command: command, Index: len(rf.logs)}
                rf.logs = append(rf.logs, log)
                rf.mu.Unlock()
        }

        return index, term, isLeader
}

// the tester doesn't halt goroutines created by Raft after each test,
// but it does call the Kill() method. your code can use killed() to
// check whether Kill() has been called. the use of atomic avoids the
// need for a lock.
//
// the issue is that long-running goroutines use memory and may chew
// up CPU time, perhaps causing later tests to fail and generating
// confusing debug output. any goroutine with a long-running loop
// should call killed() to check whether it should stop.
func (rf *Raft) Kill() {
        atomic.StoreInt32(&rf.dead, 1)
        // Your code here, if desired.
}

func (rf *Raft) killed() bool {
        z := atomic.LoadInt32(&rf.dead)
        return z == 1
}

func (rf *Raft) initLeaderState() {
        rf.nextIndex = make([]int, len(rf.peers))
        rf.matchIndex = make([]int, len(rf.peers))
        for i := range rf.peers {
                rf.nextIndex[i] = len(rf.logs)
                rf.matchIndex[i] = 0
        }
}

func (rf *Raft) startElement() {
        rf.mu.Lock()
        rf.state = "Candidate"
        rf.currentTerm++
        rf.voteCount = 1
        rf.votedFor = rf.me
        rf.lastLeaderCheck = time.Now()
        rf.mu.Unlock()
        var wg sync.WaitGroup
        for i := range rf.peers {
                if i != rf.me {
                        wg.Add(1)
                        go func(i int) {
                                defer wg.Done()
                                if rf.state == "Candidate" {

                                        args := &RequestVoteArgs{Term: rf.currentTerm, CandidateId: rf.me}
                                        if len(rf.logs) > 0 {
                                                args.LastLogIndex = len(rf.logs) - 1
                                                args.LastLogTerm = rf.logs[len(rf.logs)-1].Term
                                        }

                                        reply := &RequestVoteReply{}
                                        if rf.sendRequestVote(i, args, reply) {
                                                rf.mu.Lock()
                                                if reply.Term > rf.currentTerm {
                                                        rf.state = "Follower"
                                                        rf.currentTerm = reply.Term
                                                        rf.voteCount = 0
                                                        rf.votedFor = -1
                                                } else if reply.VoteGranted {
                                                        rf.voteCount++
                                                        if rf.voteCount >= (len(rf.peers)/2) && rf.state == "Candidate" {
                                                                rf.state = "Leader"
                                                                rf.initLeaderState()
                                                                go rf.sendAppendEntries()
                                                        }
                                                }
                                                rf.mu.Unlock()
                                        }
                                }
                        }(i)
                }
        }
        wg.Wait()
}

func (rf *Raft) ticker() {
        rd := rand.New(rand.NewSource(int64(rf.me)))
        for rf.killed() == false {

                // Your code here (3A)
                // Check if a leader election should be started.

                if rf.state == "Leader" {
                        rf.lastLeaderCheck = time.Now()
                        continue
                }
                ms := rd.Int63n(300)
                if time.Since(rf.lastLeaderCheck) > time.Duration(500+ms)*time.Millisecond || rf.state == "Candidate" {
                        rf.startElement()
                }

                // pause for a random amount of time between 50 and 350
                // milliseconds.
                ms = 50 + (rand.Int63() % 300)
                time.Sleep(time.Duration(ms) * time.Millisecond)
        }
}

func min(a, b int) int {
        if a < b {
                return a
        }
        return b
}

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {

        rf.mu.Lock()
        defer rf.mu.Unlock()

        rf.lastLeaderCheck = time.Now()

        if args.Term < rf.currentTerm {
                reply.Term = rf.currentTerm
                reply.Success = false
                return
        }

        if args.Term > rf.currentTerm {
                rf.currentTerm = args.Term
                rf.state = "Follower"
                rf.votedFor = -1
        }
        //DPrintf("AppendEntries:term=%d,me=%d,args.Term=%d,logs=%v,newlogs=%v,preind=%v,preterm=%v,state=%v", rf.currentTerm, rf.me, args.Term, rf.logs, args.Logs, args.PrevLogIndex, args.PrevLogTerm, rf.state)

        if args.PrevLogIndex > len(rf.logs)-1 || rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
                reply.Term = rf.currentTerm
                reply.Success = false
                return
        }

        if len(args.Logs) != 0 && len(rf.logs) > args.PrevLogIndex+1 && rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
                rf.logs = rf.logs[:args.PrevLogIndex+1]
        }
        if rf.me != args.LeaderId {
                rf.logs = append(rf.logs, args.Logs...)
        }

        if args.LeaderCommit > rf.commitIndex {
                rf.commitIndex = min(args.LeaderCommit, len(rf.logs)-1)
                rf.applyLogs()
        }
        reply.Term = rf.currentTerm
        reply.Success = true
        //DPrintf("AppendEntries:state:%v,term=%d,me=%d,args.Term=%d", rf.state, rf.currentTerm, rf.me, args.Term)
}

func (rf *Raft) applyLogs() {
        for rf.lastApplied < rf.commitIndex {
                rf.lastApplied++
                msg := ApplyMsg{
                        CommandValid: true,
                        Command:      rf.logs[rf.lastApplied].Command,
                        CommandIndex: rf.lastApplied,
                }
                rf.applyCh <- msg
        }
}

func (rf *Raft) updateCommitIndex() {
        for n := len(rf.logs) - 1; n > rf.commitIndex; n-- {
                count := 1
                for i := range rf.peers {
                        if i != rf.me && rf.matchIndex[i] >= n && rf.logs[n].Term == rf.currentTerm {
                                count++
                        }
                }
                if count > len(rf.peers)/2 {
                        rf.commitIndex = n
                        rf.applyLogs()
                        break
                }
        }
}

func (rf *Raft) sendAppendEntries() {

        for rf.killed() == false {
                if rf.state == "Leader" {

                        var wg sync.WaitGroup
                        for i := range rf.peers {
                                wg.Add(1)
                                go func(i int) {
                                        defer wg.Done()
                                        rf.mu.Lock()

                                        if rf.state != "Leader" {
                                                rf.mu.Unlock()
                                                return
                                        }

                                        args := &AppendEntriesArgs{Term: rf.currentTerm, LeaderId: rf.me, LeaderCommit: rf.commitIndex, PrevLogIndex: rf.matchIndex[i], PrevLogTerm: rf.logs[rf.matchIndex[i]].Term}

                                        if rf.nextIndex[i] < len(rf.logs) {
                                                args.Logs = rf.logs[rf.nextIndex[i]:]
                                        } else {
                                                args.Logs = make([]Entry, 0)
                                        }
                                        rf.mu.Unlock()

                                        reply := &AppendEntriesReply{}

                                        ok := rf.peers[i].Call("Raft.AppendEntries", args, reply)

                                        rf.mu.Lock()
                                        if ok {
                                                if reply.Success {
                                                        rf.nextIndex[i] += len(args.Logs)
                                                        rf.matchIndex[i] = rf.nextIndex[i] - 1
                                                        rf.updateCommitIndex()
                                                } else {
                                                        if reply.Term > rf.currentTerm {
                                                                rf.state = "Follower"
                                                                rf.votedFor = -1
                                                                rf.currentTerm = reply.Term
                                                                rf.lastLeaderCheck = time.Now()
                                                        } else if reply.Term == rf.currentTerm && rf.state == "Leader" && rf.nextIndex[i] > 0 {
                                                                rf.nextIndex[i]--
                                                        }
                                                }
                                        }
                                        rf.mu.Unlock()

                                }(i)

                        }
                        wg.Wait()
                } else {
                        return
                }

                ms := 100 + (rand.Int63() % 20)
                time.Sleep(time.Duration(ms) * time.Millisecond)

        }

}

// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft {
        rf := &Raft{}
        rf.peers = peers
        rf.persister = persister
        rf.me = me

        // Your initialization code here (3A, 3B, 3C).
        rf.state = "Follower"
        rf.applyCh = applyCh

        rf.nextIndex = make([]int, len(peers))
        for i := range rf.nextIndex {
                rf.nextIndex[i] = 1
        }
        rf.logs = make([]Entry, 0)
        rf.logs = append(rf.logs, Entry{Term: 0})

        // initialize from state persisted before a crash
        rf.readPersist(persister.ReadRaftState())

        // start ticker goroutine to start elections
        go rf.ticker()

        return rf
}

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

923590810 发表于 2024-8-5 22:22
本帖最后由 923590810 于 2024-8-5 22:23 编辑

在实现Raft算法的日志复制部分时,可能会遇到各种问题。以下是一些常见的改进点和建议:

### 1. 确保正确的锁机制
确保在访问共享状态时正确使用锁,以防止数据竞争和不一致。

### 2. 确保`AppendEntries`的正确性
检查`AppendEntries`的实现,确保它正确处理了各种情况,包括日志不一致的情况。

### 3. 确保日志条目正确追加
确保在追加日志条目时,正确处理了日志冲突和覆盖。

### 4. 确保提交索引正确更新
确保在日志条目复制到多数服务器后,正确更新提交索引并应用日志条目。

### 5. 确保心跳机制正常工作
确保领导者定期发送心跳(空的`AppendEntries`)来保持领导者地位。

### 详细检查代码的几个关键部分:

#### 1. `AppendEntries`函数
确保`AppendEntries`函数正确处理了各种情况,包括日志不一致、日志追加和提交索引更新。

```go
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    rf.lastLeaderCheck = time.Now()

    if args.Term < rf.currentTerm {
        reply.Term = rf.currentTerm
        reply.Success = false
        return
    }

    if args.Term > rf.currentTerm {
        rf.currentTerm = args.Term
        rf.state = "Follower"
        rf.votedFor = -1
    }

    // 检查前一个日志条目是否匹配
    if args.PrevLogIndex >= len(rf.logs) || rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
        reply.Term = rf.currentTerm
        reply.Success = false
        return
    }

    // 追加新日志条目
    rf.logs = append(rf.logs[:args.PrevLogIndex+1], args.Logs...)
   
    // 更新提交索引
    if args.LeaderCommit > rf.commitIndex {
        rf.commitIndex = min(args.LeaderCommit, len(rf.logs)-1)
        rf.applyLogs()
    }

    reply.Term = rf.currentTerm
    reply.Success = true
}
```

#### 2. `sendAppendEntries`函数
确保领导者定期发送`AppendEntries` RPC,并正确处理回复。

```go
func (rf *Raft) sendAppendEntries() {
    for rf.killed() == false {
        if rf.state == "Leader" {
            var wg sync.WaitGroup
            for i := range rf.peers {
                if i != rf.me {
                    wg.Add(1)
                    go func(i int) {
                        defer wg.Done()
                        rf.mu.Lock()
                        if rf.state != "Leader" {
                            rf.mu.Unlock()
                            return
                        }

                        prevLogIndex := rf.nextIndex - 1
                        prevLogTerm := rf.logs[prevLogIndex].Term
                        entries := make([]Entry, len(rf.logs[prevLogIndex+1:]))
                        copy(entries, rf.logs[prevLogIndex+1:])

                        args := &AppendEntriesArgs{
                            Term:         rf.currentTerm,
                            LeaderId:     rf.me,
                            PrevLogIndex: prevLogIndex,
                            PrevLogTerm:  prevLogTerm,
                            Logs:         entries,
                            LeaderCommit: rf.commitIndex,
                        }
                        rf.mu.Unlock()

                        reply := &AppendEntriesReply{}
                        ok := rf.peers.Call("Raft.AppendEntries", args, reply)

                        rf.mu.Lock()
                        if ok {
                            if reply.Success {
                                rf.nextIndex = prevLogIndex + len(entries) + 1
                                rf.matchIndex = rf.nextIndex - 1
                                rf.updateCommitIndex()
                            } else {
                                if reply.Term > rf.currentTerm {
                                    rf.state = "Follower"
                                    rf.currentTerm = reply.Term
                                    rf.votedFor = -1
                                } else {
                                    rf.nextIndex = max(1, rf.nextIndex-1)
                                }
                            }
                        }
                        rf.mu.Unlock()
                    }(i)
                }
            }
            wg.Wait()
        }
        ms := 100 + (rand.Int63() % 20)
        time.Sleep(time.Duration(ms) * time.Millisecond)
    }
}
```

#### 3. `updateCommitIndex`函数
确保在多数服务器复制日志条目后,正确更新提交索引。

```go
func (rf *Raft) updateCommitIndex() {
    for n := len(rf.logs) - 1; n > rf.commitIndex; n-- {
        count := 1
        for i := range rf.peers {
            if i != rf.me && rf.matchIndex >= n && rf.logs[n].Term == rf.currentTerm {
                count++
            }
        }
        if count > len(rf.peers)/2 {
            rf.commitIndex = n
            rf.applyLogs()
            break
        }
    }
}
```

### 其他注意事项

- **日志压缩和快照**:如果日志条目过多,可能需要实现日志压缩和快照功能,以减少日志大小和提高效率。
- **网络分区处理**:确保在网络分区恢复后,日志能够正确同步和一致。

通过检查和改进上述关键部分,可以提高日志复制功能的正确性和鲁棒性,确保通过测试。GPT-4o给你的回答
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-12-14 21:43

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表