CS144 Lab4 The Summit (TCP in full)

Lab 0 实现了flow-control ByteStream;Lab1、2 实现了TCPReceiver; Lab3实现了TCPSender。
Lab4则是要将这几个模块整合起来,完成TCP。前几个lab过于简单,而Lab4才开始上强度了。如果你之前的实现有没被测试cover的bug,那这里也会揭露出来。

请添加图片描述

理论

这个Lab也展示了TCP的**三次握手(three-way handshake)**。

在这里插入图片描述
如上图,前两次握手很好理解,就是发送初始seqno 和 接收端的 ack seqno。第三次握手则是为了建立双向通信,即客户也要 ack 服务器的 初始seqno。所以某些只需要单向传输的协议就只需要前两次握手了。

一个syn with 初始seqno和一个回复的ack,就可以建立一个单向传输。而双方各建立一个单向传输管道,就形成了TCP的双向通信。将回复的ack和反向的seq合并成一个TCP segment,就可以将四次握手压缩成三次握手。

在这里插入图片描述
四次挥手(four-way handshake)也同理。三次握手相当于建立了两个单向传输管道,那么结束连接不就是拆除这俩个单向传输管道吗?拆除一个管道需要 sender发送FIN,receiver收到FIN后回复ack,即两个TCP segment或者说两次“握手”。那么拆除两个双向管道就需要四次握手。

当某一方发送完毕,发送FIN拆除一个管道后,就只剩下一个管道,TCP就退化成了只能单向传输的单向管道。当剩下管道的sender也结束发送,那就再通过两次握手拆除剩下的一个管道。

因为是否拆除某个管道是由这个管道的sender是否发送完信息决定的,两个sender大概率不是同时结束发送信息,所以无法将第一个ACK和第二个FIN合并,所以只能通过四次握手(handshake)解除连接。理解了这些你就也会想到,可以定义一个新的protocol,任何一方发送完毕结束连接时另一方向管道也强行拆除,这样就可以将第一个ACK和第二个FIN合并,从而三次挥手即可结束连接。这也是某些轻量化魔改TCP的实现方式。

思路

TCP就是个双向传递可靠字节流的“管道”,将不可靠的IP层整理成可靠的字节流。每个端口都要即可发送又可接收。前三个lab实现的都是具体的功能,说实话就是两个特殊点的数据结构。而本lab主要负责管理连接,属于TCP的真正核心部分。

请添加图片描述
实现共分为三个模块:接收发送时间流逝

请添加图片描述

接收

当接收segments时,即 segment_received() 被调用:

  1. RST flag 为true(就是上图TCP里的那个RST),将出入流均设为error,关闭连接。若RST为false则执行以下情况:
  2. 将segment传入receiver进行解析。
  3. ACK flag为true,则将其 acknowindow_size 传给 sender。
  4. 如果传来的segment包含seqno,则至少回复一个segment,以更新acknowindow_size

这里你会发现,收到的这个segment有可能同时被receiver和sender利用。可看上图中TCP结构, sender 和 receiver所用到的红色蓝色的字节是完全独立的。换句话说,TCP在同时接收和发送时可以将 sender发送payload的segmentreceiver回复ack segment 合并成一个segment,这就是piggybacking (捎带)。

发送

  1. 当sender在其outgoing queue生成TCP segment时,已经将其负责的 seqno, syn, payloadfin 填写好了。
  2. 在发送从sender获得TCP segment前,TCP Connection需要询问receiver是否需要捎带 acknowindow_size

时间流逝

TCP Connection 和 sender 一样,有一个tick() 方法,会被操作系统周期性调用。被调用时:

  1. 调用sender的tick(),告诉其时间的流逝。
  2. 当sender的重发次数超过TCPConfig::MAX_RETX_ATTEMPTS,中断连接,发送一个reset segment给peer(空的segment with RST flag)。
  3. 必要的时候,彻底关闭连接。

关闭

关闭连接是本lab最难的一个点了。有两种关闭连接的方式,unclean shutdownclean shutdown

Unclean shundown 是当收到或者发送 reset segment的时候,将inboundoutbound bytestream均设为error state, 并且active() 立即返回 false。这也是一种出现错误时候的异常退出方式。

Clean shutdown 就是常规退出,会更加复杂,因为这要保证双方都可靠地传递完所有信息。因为two general problem,理论上不可能保证两方都可靠地传递完所有信息,但TCP会尽力近似这个理想状态。

对于一方来说,达到 clean shutdown 需要四个前提条件:

  1. IP来的 inbound stream 重组完毕并且结束。
  2. Application层来的 outbound stream 结束并发送完毕。
  3. 发送出去的TCP segment全部被ack。可以直接理解成:再也不需要向对方发送segment了(指发送信息,ack不算在内)。
  4. 确保对面端满足了条件3,即ack了所有对面发送的TCP segment。可以直接理解成:对方永远不会再发来segment了。

前三条很容易实现,但第四点难以实现,因为本地很难知道对方发送的segment有没有在路上丢失,所以TCP有两种方式来近似实现条件4。可以看我对条件3,4的更直观的翻译,条件4就是防止:对面还需要发segment过来,但本地已经关闭了,无法收到发过来的segment,出现错误。

方式A: 两个通信管道结束后等待一段时间。因为条件4是为了让对面满足条件3,所以此时本地作为receiver,对面作为sender。对方发来FIN后,即使本地全部ack了,有可能还会继续重发其他信息(因为TCP不会ack ack,本地发的ack有可能丢失,所以对面sender有可能没收到ack从而不满足条件3)。所以要等待一段时间,等着接收sender重发到segment,本地再重发ack。总的来说就是,等一段时间,让本地多重发几次ack,以增加对面sender接收到ack的概率,进而增加对面sender满足条件3的概率。

方式B如果对方是先发完的一方,那满足1~3后就可以直接关闭,即passive close。因为对方是先发送完所有内容并发送FIN关闭连接,所以肯定是已经知道发送的所有segment被ack了,否则还会继续重发而不会关闭连接。因此对方已经满足了条件3,即本地满足了条件4,所以可以直接关闭连接。

总结,可见上面四次挥手的图。Client先满足条件3,等一旦满足条件4后即可关闭(方式A);Server先满足条件4,等一旦满足条件3后即可关闭(方式B)

实现

github仓库

lab4的测试分为两部分,第一部分简单的单元测试,第二部分利用shell脚本进行压力测试。

而第二部分测试的时候我才发觉我lab1、2实现不够robust,然后debug了很久,这个lab4算是给了我模块实现robustness重要性的教训。

不能对ack进行ack回复,否则会造成ack的无限回复循环。我的实现发送ack有两种方式:一是直接回复一个空的ack segment,二是将ack 和window_size piggybacking到一个sender的segment里。所以只需判断:当segment_received()收到一个只有ack的segment时(即,只能用第二种方式发送ack,而不能新建一个ack segment。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class TCPConnection {
private:
TCPConfig _cfg;
TCPReceiver _receiver{_cfg.recv_capacity};
TCPSender _sender{_cfg.send_capacity, _cfg.rt_timeout, _cfg.fixed_isn};

//! outbound queue of segments that the TCPConnection wants sent
std::queue<TCPSegment> _segments_out{};

//! Should the TCPConnection stay active (and keep ACKing)
//! for 10 * _cfg.rt_timeout milliseconds after both streams have ended,
//! in case the remote TCPConnection doesn't know we've received its whole stream?
bool _linger_after_streams_finish{true};

// connection active flag.
bool _active{true};

// Has sent syn or not.
bool _sent_syn{false};

// Time since last segment received.
size_t _time_since_last_received{0};

// Grip segments from _sender, and push them to _segments_out.
void send_segments();

// Send segment with rst flag.
void send_rst_segment();

public:
... ...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity(); }

size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); }

size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); }

size_t TCPConnection::time_since_last_segment_received() const { return _time_since_last_received; }

void TCPConnection::segment_received(const TCPSegment &seg) {
_time_since_last_received = 0;

auto &header = seg.header();

// Receiver.
_receiver.segment_received(seg);

if (header.rst) {
// set both to error state, and end connection.
_sender.stream_in().set_error();
_receiver.stream_out().set_error();
_active = false;
_linger_after_streams_finish = false;
return;
}

// Sender.
// Has sent syn and is ack seg: normal ack.
if (_sent_syn && header.ack) {
_sender.ack_received(header.ackno, header.win);
}

// Try to connect, send syn ack.
if (TCPState::state_summary(_receiver) == TCPReceiverStateSummary::SYN_RECV &&
TCPState::state_summary(_sender) == TCPSenderStateSummary::CLOSED) {
connect();
return;
}

// Try to set _linger_after_streams_finish = false.
// CLOSE_WAIT.
if (TCPState::state_summary(_receiver) == TCPReceiverStateSummary::FIN_RECV &&
TCPState::state_summary(_sender) == TCPSenderStateSummary::SYN_ACKED) {
_linger_after_streams_finish = false;
}

// If _reciever has closed, directly close cleanly without lingering.
// CLOSED.
if (TCPState::state_summary(_receiver) == TCPReceiverStateSummary::FIN_RECV &&
TCPState::state_summary(_sender) == TCPSenderStateSummary::FIN_ACKED && _linger_after_streams_finish == false) {
_active = false;
return;
}

// Receiver reply.
// If seg is a single ack segment, don't need to re-ack.
if (seg.length_in_sequence_space() != 0) {
_sender.send_empty_segment();
}
send_segments();
}

bool TCPConnection::active() const { return _active; }

size_t TCPConnection::write(const string &data) {
size_t write_size = _sender.stream_in().write(data);
_sender.fill_window();
send_segments();
return write_size;
}

//! \param[in] ms_since_last_tick number of milliseconds since the last call to this method
void TCPConnection::tick(const size_t ms_since_last_tick) {
_time_since_last_received += ms_since_last_tick;

if (!_sent_syn) {
return;
}

// Transformsmission too many times, abort.
if (_sender.consecutive_retransmissions() >= _cfg.MAX_RETX_ATTEMPTS) {
send_rst_segment();
return;
}

// Retransmission.
_sender.tick(ms_since_last_tick);
send_segments();

// Lingering, and close connection cleanly.
if (TCPState::state_summary(_receiver) == TCPReceiverStateSummary::FIN_RECV &&
TCPState::state_summary(_sender) == TCPSenderStateSummary::FIN_ACKED && _linger_after_streams_finish &&
_time_since_last_received >= 10 * _cfg.rt_timeout) {
_active = false;
_linger_after_streams_finish = false;
}
}

void TCPConnection::end_input_stream() {
if (!_sent_syn) {
return;
}
_sender.stream_in().end_input();
// Send FIN segment.
_sender.fill_window();
send_segments();
}

void TCPConnection::connect() {
_sender.fill_window();
_sent_syn = true;
_active = true;
send_segments();
}

TCPConnection::~TCPConnection() {
try {
if (active()) {
cerr << "Warning: Unclean shutdown of TCPConnection\n";
// Your code here: need to send a RST segment to the peer
send_rst_segment();
}
} catch (const exception &e) {
std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl;
}
}

void TCPConnection::send_segments() {
auto re_ack = _receiver.ackno();
auto &sender_out = _sender.segments_out();
while (!sender_out.empty()) {
if (re_ack.has_value()) {
sender_out.front().header().ack = true;
sender_out.front().header().ackno = _receiver.ackno().value();
sender_out.front().header().win = _receiver.window_size();
}
_segments_out.push(std::move(sender_out.front()));
sender_out.pop();
}
}

void TCPConnection::send_rst_segment() {
// Clear out _segments_out.
while (!_segments_out.empty()) {
_segments_out.pop();
}

// Generate rst segment.
TCPSegment seg;
seg.header().rst = true;

// Send segments.
_segments_out.push(std::move(seg));

_sender.stream_in().set_error();
_receiver.stream_out().set_error();
_active = false;
_linger_after_streams_finish = false;
}

测试

我没有像有的人一样遇到一些环境问题,用很新的Ubuntu24.04,g++ downgrade到9.0。
请添加图片描述

请添加图片描述

Benchmark 合格但较低,懒得优化了。这里有一些优化过程参考

请添加图片描述

然后,就可以在修改lab0的webget中TCP成我们写的TCP,进行通讯。

请添加图片描述

参考资料

https://lrl52.top/1015/cs144-lablab4/

https://kiprey.github.io/2021/11/cs144-lab4/

https://blog.csdn.net/hkhl_235/article/details/79721645