diff --git a/examples/client.cc b/examples/client.cc index ba4a3f3699f646e0aefc93254ad7ab2e1dc6bd83..9040087ad9660cbef71851dc8538fd952ec1b9eb 100644 --- a/examples/client.cc +++ b/examples/client.cc @@ -54,6 +54,11 @@ namespace { Config config{}; } // namespace +Buffer::Buffer(const uint8_t *data, size_t datalen) + : buf{data, data + datalen}, pos(std::begin(buf)) {} + +Buffer::Buffer() : pos(std::begin(buf)) {} + namespace { int bio_write(BIO *b, const char *buf, int len) { int rv; @@ -138,7 +143,13 @@ BIO_METHOD *create_bio_method() { } // namespace namespace { -void writecb(struct ev_loop *loop, ev_io *w, int revents) {} +void writecb(struct ev_loop *loop, ev_io *w, int revents) { + auto c = static_cast<Client *>(w->data); + + if (c->on_write() != 0) { + c->disconnect(); + } +} } // namespace namespace { @@ -192,10 +203,13 @@ Client::Client(struct ev_loop *loop, SSL_CTX *ssl_ctx) stdinfd_(-1), stream_id_(0), chandshake_idx_(0), + tx_stream0_offset_(0), nsread_(0), conn_(nullptr), crypto_ctx_{}, - streambuf_idx_(0) { + streambuf_idx_(0), + tx_stream_offset_(0), + should_send_fin_(false) { ev_io_init(&wev_, writecb, 0, EV_WRITE); ev_io_init(&rev_, readcb, 0, EV_READ); ev_io_init(&stdinrev_, stdin_readcb, 0, EV_READ); @@ -286,7 +300,7 @@ int recv_stream_data(ngtcp2_conn *conn, uint32_t stream_id, uint8_t fin, const uint8_t *data, size_t datalen, void *user_data, void *stream_user_data) { debug::print_stream_data(stream_id, data, datalen); - + ngtcp2_conn_extend_max_stream_offset(conn, stream_id, datalen); return 0; } } // namespace @@ -410,8 +424,8 @@ int Client::init(int fd, const Address &remote_addr, const char *addr, 0, std::numeric_limits<uint64_t>::max())(randgen); ngtcp2_settings settings; - settings.max_stream_data = 128_k; - settings.max_data = 128; + settings.max_stream_data = 64_k; + settings.max_data = 64; settings.max_stream_id = 0; settings.idle_timeout = 5; settings.omit_connection_id = 0; @@ -514,6 +528,26 @@ int Client::on_write() { std::array<uint8_t, NGTCP2_MAX_PKTLEN_IPV4> buf; assert(buf.size() >= max_pktlen_); + for (auto it = std::begin(streambuf_) + streambuf_idx_; + it != std::end(streambuf_); ++it) { + auto &v = *it; + if (on_write_stream(stream_id_, 0, v) != 0) { + return -1; + } + if (v.left() > 0) { + break; + } + ++streambuf_idx_; + } + + if (streambuf_idx_ == streambuf_.size() && should_send_fin_) { + should_send_fin_ = false; + auto v = Buffer{}; + if (on_write_stream(stream_id_, 1, v) != 0) { + return -1; + } + } + for (;;) { auto n = ngtcp2_conn_send(conn_, buf.data(), max_pktlen_, util::timestamp()); @@ -522,8 +556,7 @@ int Client::on_write() { return -1; } if (n == 0) { - schedule_retransmit(); - return 0; + break; } if (debug::packet_lost(config.tx_loss_prob)) { @@ -537,30 +570,34 @@ int Client::on_write() { return -1; } } + + schedule_retransmit(); + return 0; } -int Client::on_write_stream(uint32_t stream_id, uint8_t fin, - const uint8_t *data, size_t datalen) { +int Client::on_write_stream(uint32_t stream_id, uint8_t fin, Buffer &data) { std::array<uint8_t, NGTCP2_MAX_PKTLEN_IPV4> buf; assert(buf.size() >= max_pktlen_); size_t ndatalen; for (;;) { auto n = ngtcp2_conn_write_stream(conn_, buf.data(), max_pktlen_, &ndatalen, - stream_id, fin, data, datalen, + stream_id, fin, data.rpos(), data.left(), util::timestamp()); if (n < 0) { + if (n == NGTCP2_ERR_STREAM_DATA_BLOCKED) { + return 0; + } std::cerr << "ngtcp2_conn_write_stream: " << ngtcp2_strerror(n) << std::endl; return -1; } - data += ndatalen; - datalen -= ndatalen; + data.pos += ndatalen; if (debug::packet_lost(config.tx_loss_prob)) { std::cerr << "** Simulated outgoing packet loss **" << std::endl; - if (datalen == 0) { + if (data.left() == 0) { break; } continue; @@ -572,14 +609,11 @@ int Client::on_write_stream(uint32_t stream_id, uint8_t fin, return -1; } - if (datalen == 0) { + if (data.left() == 0) { break; } } - if (datalen == 0) { - schedule_retransmit(); - } return 0; } @@ -602,7 +636,7 @@ void Client::schedule_retransmit() { } int Client::write_client_handshake(const uint8_t *data, size_t datalen) { - chandshake_.emplace_back(data, data + datalen); + chandshake_.emplace_back(data, datalen); return 0; } @@ -611,8 +645,8 @@ size_t Client::read_client_handshake(const uint8_t **pdest) { return 0; } const auto &v = chandshake_[chandshake_idx_++]; - *pdest = v.data(); - return v.size(); + *pdest = v.buf.data(); + return v.buf.size(); } size_t Client::read_server_handshake(uint8_t *buf, size_t buflen) { @@ -753,37 +787,32 @@ int Client::send_interactive_input() { return stop_interactive_input(); } - streambuf_.emplace_back(std::begin(buf), std::end(buf)); - const auto &v = streambuf_.back(); + streambuf_.emplace_back(buf.data(), nread); - rv = on_write_stream(stream_id_, 0, v.data(), nread); - if (rv != 0) { - return -1; - } + ev_feed_event(loop_, &wev_, EV_WRITE); - return on_write(); + return 0; } int Client::stop_interactive_input() { int rv; - rv = on_write_stream(stream_id_, 1, nullptr, 0); - if (rv != 0) { - return -1; - } + should_send_fin_ = true; ev_io_stop(loop_, &stdinrev_); std::cerr << "Interactive session has ended." << std::endl; - return on_write(); + ev_feed_event(loop_, &wev_, EV_WRITE); + + return 0; } namespace { -void remove_tx_stream_data(std::deque<std::vector<uint8_t>> &d, size_t &idx, - size_t datalen) { - for (; !d.empty() && d.front().size() <= datalen;) { +void remove_tx_stream_data(std::deque<Buffer> &d, size_t &idx, + uint64_t &tx_offset, uint64_t offset) { + for (; !d.empty() && tx_offset + d.front().buf.size() <= offset;) { --idx; - datalen -= d.front().size(); + tx_offset += d.front().buf.size(); d.pop_front(); } } @@ -792,10 +821,12 @@ void remove_tx_stream_data(std::deque<std::vector<uint8_t>> &d, size_t &idx, void Client::remove_tx_stream_data(uint32_t stream_id, uint64_t offset, size_t datalen) { if (stream_id == 0) { - ::remove_tx_stream_data(chandshake_, chandshake_idx_, datalen); + ::remove_tx_stream_data(chandshake_, chandshake_idx_, tx_stream0_offset_, + offset + datalen); return; } - ::remove_tx_stream_data(streambuf_, streambuf_idx_, datalen); + ::remove_tx_stream_data(streambuf_, streambuf_idx_, tx_stream_offset_, + offset + datalen); } namespace { diff --git a/examples/client.h b/examples/client.h index 85be46938380013e2e592f74ff0109372489abf5..d347d48387c99ea578ba18043c7f1c3f75f3e1f9 100644 --- a/examples/client.h +++ b/examples/client.h @@ -53,6 +53,18 @@ struct Config { int fd; }; +struct Buffer { + Buffer(const uint8_t *data, size_t datalen); + Buffer(); + size_t left() const { return std::end(buf) - pos; } + const uint8_t *rpos() const { + return buf.data() + std::distance(std::begin(buf), pos); + } + + std::vector<uint8_t> buf; + std::vector<uint8_t>::const_iterator pos; +}; + class Client { public: Client(struct ev_loop *loop, SSL_CTX *ssl_ctx); @@ -64,8 +76,7 @@ public: int tls_handshake(); int on_read(); int on_write(); - int on_write_stream(uint32_t stream_id, uint8_t fin, const uint8_t *data, - size_t datalen); + int on_write_stream(uint32_t stream_id, uint8_t fin, Buffer &data); int feed_data(uint8_t *data, size_t datalen); void schedule_retransmit(); @@ -105,14 +116,23 @@ private: int fd_; int stdinfd_; uint32_t stream_id_; - std::deque<std::vector<uint8_t>> chandshake_; + std::deque<Buffer> chandshake_; + // chandshake_idx_ is the index in chandshake_, which points to the + // buffer to read next. size_t chandshake_idx_; + uint64_t tx_stream0_offset_; std::vector<uint8_t> shandshake_; size_t nsread_; ngtcp2_conn *conn_; crypto::Context crypto_ctx_; - std::deque<std::vector<uint8_t>> streambuf_; + std::deque<Buffer> streambuf_; + // streambuf_idx_ is the index in streambuf_, which points to the + // buffer to send next. size_t streambuf_idx_; + // tx_stream_offset_ is the offset where all data before offset is + // acked by the remote endpoint. + uint64_t tx_stream_offset_; + bool should_send_fin_; }; #endif // CLIENT_H diff --git a/examples/server.cc b/examples/server.cc index 387bec4365689da428d787df3d42c3f13e97817a..fc7938c5b642a32f99550b1c5d9b17366f2f46a5 100644 --- a/examples/server.cc +++ b/examples/server.cc @@ -53,6 +53,11 @@ namespace { Config config{}; } // namespace +Buffer::Buffer(const uint8_t *data, size_t datalen) + : buf{data, data + datalen}, pos(std::begin(buf)) {} + +Buffer::Buffer() : pos(std::begin(buf)) {} + namespace { int bio_write(BIO *b, const char *buf, int len) { int rv; @@ -137,7 +142,10 @@ BIO_METHOD *create_bio_method() { } // namespace Stream::Stream(uint32_t stream_id) - : stream_id(stream_id), streambuf_idx(0), should_send_fin(false) {} + : stream_id(stream_id), + streambuf_idx(0), + tx_stream_offset(0), + should_send_fin(false) {} namespace { void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) { @@ -175,7 +183,8 @@ Handler::Handler(struct ev_loop *loop, SSL_CTX *ssl_ctx, Server *server) conn_(nullptr), crypto_ctx_{}, conn_id_(std::uniform_int_distribution<uint64_t>( - 0, std::numeric_limits<uint64_t>::max())(randgen)) { + 0, std::numeric_limits<uint64_t>::max())(randgen)), + tx_stream0_offset_(0) { ev_timer_init(&timer_, timeoutcb, 0., 30.); timer_.data = this; ev_timer_init(&rttimer_, retransmitcb, 0., 0.); @@ -354,8 +363,8 @@ int Handler::init(int fd, const sockaddr *sa, socklen_t salen) { ngtcp2_settings settings; - settings.max_stream_data = 128_k; - settings.max_data = 128; + settings.max_stream_data = 64_k; + settings.max_data = 64; // TODO Just allow stream ID = 1 to exchange encrypted data for now. settings.max_stream_id = 1; settings.idle_timeout = 5; @@ -404,7 +413,7 @@ int Handler::tls_handshake() { } int Handler::write_server_handshake(const uint8_t *data, size_t datalen) { - shandshake_.emplace_back(data, data + datalen); + shandshake_.emplace_back(data, datalen); return 0; } @@ -412,9 +421,11 @@ size_t Handler::read_server_handshake(const uint8_t **pdest) { if (shandshake_idx_ == shandshake_.size()) { return 0; } - const auto &v = shandshake_[shandshake_idx_++]; - *pdest = v.data(); - return v.size(); + auto &v = shandshake_[shandshake_idx_++]; + *pdest = v.rpos(); + auto left = v.left(); + v.pos += left; + return left; } size_t Handler::read_client_handshake(uint8_t *buf, size_t buflen) { @@ -558,8 +569,7 @@ int Handler::on_write() { return -1; } if (n == 0) { - schedule_retransmit(); - return 0; + break; } if (debug::packet_lost(config.tx_loss_prob)) { @@ -574,6 +584,9 @@ int Handler::on_write() { return -1; } } + + schedule_retransmit(); + return 0; } int Handler::on_write_stream(Stream &stream) { @@ -585,7 +598,8 @@ int Handler::on_write_stream(Stream &stream) { if (stream.streambuf_idx == stream.streambuf.size()) { if (stream.should_send_fin) { stream.should_send_fin = false; - if (write_stream_data(stream, 1, nullptr, 0) != 0) { + auto v = Buffer{}; + if (write_stream_data(stream, 1, v) != 0) { return -1; } } @@ -594,46 +608,48 @@ int Handler::on_write_stream(Stream &stream) { for (auto it = std::begin(stream.streambuf) + stream.streambuf_idx; it != std::end(stream.streambuf); ++it) { - const auto &v = *it; + auto &v = *it; auto fin = stream.should_send_fin && stream.streambuf_idx == stream.streambuf.size() - 1; - if (fin) { - stream.should_send_fin = false; - } - if (write_stream_data(stream, fin, v.data(), v.size()) != 0) { + if (write_stream_data(stream, fin, v) != 0) { return -1; } + if (v.left() > 0) { + break; + } ++stream.streambuf_idx; + if (fin) { + stream.should_send_fin = false; + } } - schedule_retransmit(); - return 0; } -int Handler::write_stream_data(Stream &stream, int fin, const uint8_t *data, - size_t datalen) { +int Handler::write_stream_data(Stream &stream, int fin, Buffer &data) { std::array<uint8_t, NGTCP2_MAX_PKTLEN_IPV4> buf; size_t ndatalen; assert(buf.size() >= max_pktlen_); - for (; datalen || fin;) { + for (;;) { auto n = ngtcp2_conn_write_stream(conn_, buf.data(), max_pktlen_, &ndatalen, - stream.stream_id, fin && datalen == 0, - data, datalen, util::timestamp()); + stream.stream_id, fin, data.rpos(), + data.left(), util::timestamp()); if (n < 0) { + if (n == NGTCP2_ERR_STREAM_DATA_BLOCKED) { + return 0; + } std::cerr << "ngtcp2_conn_write_stream: " << ngtcp2_strerror(n) << std::endl; return -1; } - data += ndatalen; - datalen -= ndatalen; + data.pos += ndatalen; if (debug::packet_lost(config.tx_loss_prob)) { std::cerr << "** Simulated outgoing packet loss **" << std::endl; - if (fin && ndatalen == 0) { + if (data.left() == 0) { return 0; } continue; @@ -645,7 +661,7 @@ int Handler::write_stream_data(Stream &stream, int fin, const uint8_t *data, std::cerr << "sendto: " << strerror(errno) << std::endl; return -1; } - if (fin && ndatalen == 0) { + if (data.left() == 0) { return 0; } } @@ -689,20 +705,24 @@ int Handler::recv_stream_data(uint32_t stream_id, uint8_t fin, static constexpr uint8_t start_tag[] = "<blink>"; static constexpr uint8_t end_tag[] = "</blink>"; - auto v = std::vector<uint8_t>(); - v.resize(str_size(start_tag) + datalen + str_size(end_tag)); + auto v = Buffer{}; + v.buf.resize(str_size(start_tag) + datalen + str_size(end_tag)); - auto p = v.data(); + auto p = std::begin(v.buf); p = std::copy_n(start_tag, str_size(start_tag), p); p = std::copy_n(data, datalen, p); p = std::copy_n(end_tag, str_size(end_tag), p); + v.pos = std::begin(v.buf); + stream.streambuf.emplace_back(std::move(v)); } stream.should_send_fin = fin != 0; + ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, datalen); + return 0; } @@ -715,11 +735,11 @@ const Address &Handler::remote_addr() const { return remote_addr_; } ngtcp2_conn *Handler::conn() const { return conn_; } namespace { -void remove_tx_stream_data(std::deque<std::vector<uint8_t>> &d, size_t &idx, - size_t datalen) { - for (; !d.empty() && d.front().size() <= datalen;) { +void remove_tx_stream_data(std::deque<Buffer> &d, size_t &idx, + uint64_t &tx_offset, uint64_t offset) { + for (; !d.empty() && tx_offset + d.front().buf.size() <= offset;) { --idx; - datalen -= d.front().size(); + tx_offset += d.front().buf.size(); d.pop_front(); } } @@ -728,13 +748,15 @@ void remove_tx_stream_data(std::deque<std::vector<uint8_t>> &d, size_t &idx, void Handler::remove_tx_stream_data(uint32_t stream_id, uint64_t offset, size_t datalen) { if (stream_id == 0) { - ::remove_tx_stream_data(shandshake_, shandshake_idx_, datalen); + ::remove_tx_stream_data(shandshake_, shandshake_idx_, tx_stream0_offset_, + offset + datalen); return; } auto it = streams_.find(stream_id); assert(it != std::end(streams_)); auto &stream = (*it).second; - ::remove_tx_stream_data(stream.streambuf, stream.streambuf_idx, datalen); + ::remove_tx_stream_data(stream.streambuf, stream.streambuf_idx, + stream.tx_stream_offset, offset + datalen); } namespace { diff --git a/examples/server.h b/examples/server.h index 0cc59543e37f77343cf7b4d4d9b9c669b4e6beeb..e10b3aedf4e698d8d5996b4ec712d3f29f185ccd 100644 --- a/examples/server.h +++ b/examples/server.h @@ -51,14 +51,29 @@ struct Config { double rx_loss_prob; }; +struct Buffer { + Buffer(const uint8_t *data, size_t datalen); + Buffer(); + size_t left() const { return std::end(buf) - pos; } + const uint8_t *rpos() const { + return buf.data() + std::distance(std::begin(buf), pos); + } + + std::vector<uint8_t> buf; + std::vector<uint8_t>::const_iterator pos; +}; + struct Stream { Stream(uint32_t stream_id); uint32_t stream_id; - std::deque<std::vector<uint8_t>> streambuf; + std::deque<Buffer> streambuf; + // streambuf_idx is the index in streambuf, which points to the + // buffer to send next. size_t streambuf_idx; - size_t stream_woffset; - size_t stream_roffset; + // tx_stream_offset is the offset where all data before offset is + // acked by the remote endpoint. + uint64_t tx_stream_offset; bool should_send_fin; }; @@ -74,8 +89,7 @@ public: int on_read(uint8_t *data, size_t datalen); int on_write(); int on_write_stream(Stream &stream); - int write_stream_data(Stream &stream, int fin, const uint8_t *data, - size_t datalen); + int write_stream_data(Stream &stream, int fin, Buffer &data); int feed_data(uint8_t *data, size_t datalen); void schedule_retransmit(); void signal_write(); @@ -116,12 +130,17 @@ private: ev_timer rttimer_; std::vector<uint8_t> chandshake_; size_t ncread_; - std::deque<std::vector<uint8_t>> shandshake_; + std::deque<Buffer> shandshake_; + // shandshake_idx_ is the index in shandshake_, which points to the + // buffer to read next. size_t shandshake_idx_; ngtcp2_conn *conn_; crypto::Context crypto_ctx_; std::map<uint32_t, Stream> streams_; uint64_t conn_id_; + // tx_stream0_offset_ is the offset where all data before offset is + // acked by the remote endpoint. + uint64_t tx_stream0_offset_; }; class Server { diff --git a/lib/includes/ngtcp2/ngtcp2.h b/lib/includes/ngtcp2/ngtcp2.h index 4345c647343868afcdfab15b2da2362d8bb06ebe..af036886c588dda2aefec70d1dd584db4a9e618d 100644 --- a/lib/includes/ngtcp2/ngtcp2.h +++ b/lib/includes/ngtcp2/ngtcp2.h @@ -176,6 +176,8 @@ typedef enum { NGTCP2_ERR_BAD_ACK = -207, NGTCP2_ERR_STREAM_ID_BLOCKED = -208, NGTCP2_ERR_STREAM_IN_USE = -209, + NGTCP2_ERR_STREAM_DATA_BLOCKED = -210, + NGTCP2_ERR_FLOW_CONTROL = -211, NGTCP2_ERR_FATAL = -500, NGTCP2_ERR_NOMEM = -501, NGTCP2_ERR_CALLBACK_FAILURE = -502, @@ -840,7 +842,7 @@ NGTCP2_EXTERN int ngtcp2_conn_open_stream(ngtcp2_conn *conn, uint32_t stream_id, * :enum:`NGTCP2_ERR_NOBUF` * Buffer is too small * :enum:`NGTCP2_ERR_INVALID_ARGUMENT` - * Stream does not exist. + * Stream does not exist; or |stream_id| is 0. * :enum:`NGTCP2_ERR_CALLBACK_FAILURE` * User callback failed */ @@ -851,6 +853,22 @@ NGTCP2_EXTERN ssize_t ngtcp2_conn_write_stream(ngtcp2_conn *conn, uint8_t *dest, size_t datalen, ngtcp2_tstamp ts); +/** + * @function + * + * `ngtcp2_conn_extend_max_stream_offset` extends stream's max stream + * data value by |datalen|. + * + * This function returns 0 if it succeeds, or one of the following + * negative error codes: + * + * :enum:`NGTCP2_ERR_INVALID_ARGUMENT` + * |stream_id| is 0; or stream was not found + */ +NGTCP2_EXTERN int ngtcp2_conn_extend_max_stream_offset(ngtcp2_conn *conn, + uint32_t stream_id, + size_t datalen); + /** * @function * diff --git a/lib/ngtcp2_acktr.c b/lib/ngtcp2_acktr.c index ac1897c8d141d61f57cafc4947c751ffa09eb1a7..43a2476c43e3e37697b4348080d07e4ac77c8222 100644 --- a/lib/ngtcp2_acktr.c +++ b/lib/ngtcp2_acktr.c @@ -55,7 +55,7 @@ int ngtcp2_acktr_add(ngtcp2_acktr *acktr, ngtcp2_acktr_entry *ent) { } /* TODO What to do if we receive duplicated packet number? */ if ((*pent)->pkt_num == ent->pkt_num) { - return NGTCP2_ERR_INVALID_ARGUMENT; + return NGTCP2_ERR_PROTO; } break; } diff --git a/lib/ngtcp2_acktr.h b/lib/ngtcp2_acktr.h index ae63f065f8b98a3ed1d7c1b97bc3864ecbfe8f29..29a7d04923db4d3dc7525ab74474234b40664f0f 100644 --- a/lib/ngtcp2_acktr.h +++ b/lib/ngtcp2_acktr.h @@ -85,7 +85,7 @@ void ngtcp2_acktr_free(ngtcp2_acktr *acktr); * This function returns 0 if it succeeds, or one of the following * negative error codes: * - * NGTCP2_ERR_INVALID_ARGUMENT + * NGTCP2_ERR_PROTO * Same packet number has already been included in |acktr|. */ int ngtcp2_acktr_add(ngtcp2_acktr *acktr, ngtcp2_acktr_entry *ent); diff --git a/lib/ngtcp2_conn.c b/lib/ngtcp2_conn.c index b741d32402c9dc058993f724d57273f186b54aeb..a3fdd91e8eaa5b85d29284f2f9aa7ed9ba62baeb 100644 --- a/lib/ngtcp2_conn.c +++ b/lib/ngtcp2_conn.c @@ -166,7 +166,10 @@ static int conn_new(ngtcp2_conn **pconn, uint64_t conn_id, uint32_t version, rv = NGTCP2_ERR_NOMEM; goto fail_strm0_malloc; } - rv = ngtcp2_strm_init((*pconn)->strm0, 0, NGTCP2_STRM_FLAG_NONE, NULL, mem); + /* TODO Initial max_stream_data for stream 0? */ + rv = ngtcp2_strm_init((*pconn)->strm0, 0, NGTCP2_STRM_FLAG_NONE, + NGTCP2_STRM0_MAX_STREAM_DATA, + NGTCP2_STRM0_MAX_STREAM_DATA, NULL, mem); if (rv != 0) { goto fail_strm0_init; } @@ -236,8 +239,8 @@ int ngtcp2_conn_client_new(ngtcp2_conn **pconn, uint64_t conn_id, /* TODO Since transport parameters are not required for interop now, just supply sensible default here. Remove this when transport parameter gets mandatory. */ - (*pconn)->remote_settings.max_stream_data = 128 * 1024; - (*pconn)->remote_settings.max_data = 128; + (*pconn)->remote_settings.max_stream_data = 64 * 1024; + (*pconn)->remote_settings.max_data = 64; (*pconn)->remote_settings.max_stream_id = 1; return 0; @@ -262,8 +265,8 @@ int ngtcp2_conn_server_new(ngtcp2_conn **pconn, uint64_t conn_id, /* TODO Since transport parameters are not required for interop now, just supply sensible default here. Remove this when transport parameter gets mandatory. */ - (*pconn)->remote_settings.max_stream_data = 128 * 1024; - (*pconn)->remote_settings.max_data = 128; + (*pconn)->remote_settings.max_stream_data = 64 * 1024; + (*pconn)->remote_settings.max_data = 64; (*pconn)->remote_settings.max_stream_id = 0; return 0; @@ -420,10 +423,11 @@ static ssize_t conn_retransmit_unprotected(ngtcp2_conn *conn, uint8_t *dest, int rv; ngtcp2_upe upe; ngtcp2_pkt_hd hd = ent->hd; - ngtcp2_frame_chain **pfrc; + ngtcp2_frame_chain **pfrc, *frc; ngtcp2_rtb_entry *nent = NULL; ngtcp2_frame localfr; int pkt_empty = 1; + int send_pkt_cb_called = 0; /* This is required because ent->hd may have old client version. */ hd.version = conn->version; @@ -437,29 +441,41 @@ static ssize_t conn_retransmit_unprotected(ngtcp2_conn *conn, uint8_t *dest, return rv; } - rv = conn_call_send_pkt(conn, &hd); - if (rv != 0) { - return rv; - } - /* TODO Don't include ACK in this unprotected packet in order not to ack protected packet here for now. */ - for (pfrc = &ent->frc; *pfrc; pfrc = &(*pfrc)->next) { + for (pfrc = &ent->frc; *pfrc;) { + if ((*pfrc)->fr.type == NGTCP2_FRAME_MAX_STREAM_DATA) { + if ((*pfrc)->fr.max_stream_data.max_stream_data < + conn->strm0->unsent_max_rx_offset) { + frc = *pfrc; + *pfrc = (*pfrc)->next; + ngtcp2_frame_chain_del(frc, conn->mem); + continue; + } + } rv = ngtcp2_upe_encode_frame(&upe, &(*pfrc)->fr); if (rv != 0) { if (rv == NGTCP2_ERR_NOBUF) { break; } } + + if (!send_pkt_cb_called) { + rv = conn_call_send_pkt(conn, &hd); + if (rv != 0) { + return rv; + } + send_pkt_cb_called = 1; + } + rv = conn_call_send_frame(conn, &hd, &(*pfrc)->fr); if (rv != 0) { return rv; } - } - if (*pfrc != ent->frc) { pkt_empty = 0; + pfrc = &(*pfrc)->next; } if (pkt_empty) { @@ -522,6 +538,8 @@ static ssize_t conn_retransmit_protected(ngtcp2_conn *conn, uint8_t *dest, int pkt_empty = 1; ssize_t nwrite; ngtcp2_crypto_ctx ctx; + ngtcp2_strm *strm; + int send_pkt_cb_called = 0; /* This is required because ent->hd may have old client version. */ hd.version = conn->version; @@ -540,11 +558,6 @@ static ssize_t conn_retransmit_protected(ngtcp2_conn *conn, uint8_t *dest, return rv; } - rv = conn_call_send_pkt(conn, &hd); - if (rv != 0) { - return rv; - } - localfr.type = !NGTCP2_FRAME_ACK; rv = conn_create_ack_frame(conn, &localfr.ack, ts); if (rv != 0) { @@ -555,6 +568,15 @@ static ssize_t conn_retransmit_protected(ngtcp2_conn *conn, uint8_t *dest, if (rv != 0) { return rv; } + + if (!send_pkt_cb_called) { + rv = conn_call_send_pkt(conn, &hd); + if (rv != 0) { + return rv; + } + send_pkt_cb_called = 1; + } + rv = conn_call_send_frame(conn, &hd, &localfr); if (rv != 0) { return rv; @@ -564,12 +586,28 @@ static ssize_t conn_retransmit_protected(ngtcp2_conn *conn, uint8_t *dest, } for (pfrc = &ent->frc; *pfrc;) { - if ((*pfrc)->fr.type == NGTCP2_FRAME_MAX_STREAM_ID && - (*pfrc)->fr.max_stream_id.max_stream_id < conn->max_remote_stream_id) { - frc = *pfrc; - pfrc = &(*pfrc)->next; - ngtcp2_frame_chain_del(frc, conn->mem); - continue; + switch ((*pfrc)->fr.type) { + case NGTCP2_FRAME_MAX_STREAM_ID: + if ((*pfrc)->fr.max_stream_id.max_stream_id < + conn->max_remote_stream_id) { + frc = *pfrc; + *pfrc = (*pfrc)->next; + ngtcp2_frame_chain_del(frc, conn->mem); + continue; + } + break; + case NGTCP2_FRAME_MAX_STREAM_DATA: + strm = + ngtcp2_conn_find_stream(conn, (*pfrc)->fr.max_stream_data.stream_id); + if (strm == NULL || + (*pfrc)->fr.max_stream_data.max_stream_data < + strm->unsent_max_rx_offset) { + frc = *pfrc; + *pfrc = (*pfrc)->next; + ngtcp2_frame_chain_del(frc, conn->mem); + continue; + } + break; } rv = ngtcp2_ppe_encode_frame(&ppe, &(*pfrc)->fr); if (rv != 0) { @@ -577,16 +615,22 @@ static ssize_t conn_retransmit_protected(ngtcp2_conn *conn, uint8_t *dest, break; } } + + if (!send_pkt_cb_called) { + rv = conn_call_send_pkt(conn, &hd); + if (rv != 0) { + return rv; + } + send_pkt_cb_called = 1; + } + rv = conn_call_send_frame(conn, &hd, &(*pfrc)->fr); if (rv != 0) { return rv; } - pfrc = &(*pfrc)->next; - } - - if (*pfrc != ent->frc) { pkt_empty = 0; + pfrc = &(*pfrc)->next; } if (pkt_empty) { @@ -713,12 +757,14 @@ static ssize_t conn_encode_handshake_pkt(ngtcp2_conn *conn, uint8_t *dest, int rv; ngtcp2_upe upe; ngtcp2_pkt_hd hd; - ngtcp2_frame_chain *frc = NULL; + ngtcp2_frame_chain *frc = NULL, **pfrc, *frc_head = NULL, *frc_next; ngtcp2_frame *fr, localfr; size_t nwrite; ngtcp2_rtb_entry *rtbent; int pkt_empty = 1; + pfrc = &frc_head; + ngtcp2_pkt_hd_init(&hd, NGTCP2_PKT_FLAG_LONG_FORM, type, conn->conn_id, conn->next_tx_pkt_num, conn->version); @@ -740,15 +786,46 @@ static ssize_t conn_encode_handshake_pkt(ngtcp2_conn *conn, uint8_t *dest, /* TODO Should we retransmit ACK frame? */ rv = conn_create_ack_frame(conn, &localfr.ack, ts); if (rv != 0) { - goto fail; + return rv; } if (localfr.type == NGTCP2_FRAME_ACK) { rv = ngtcp2_upe_encode_frame(&upe, &localfr); if (rv != 0) { - goto fail; + return rv; } rv = conn_call_send_frame(conn, &hd, &localfr); + if (rv != 0) { + return rv; + } + + pkt_empty = 0; + } + + if (conn->strm0->max_rx_offset < conn->strm0->unsent_max_rx_offset) { + rv = ngtcp2_frame_chain_new(&frc, conn->mem); + if (rv != 0) { + return rv; + } + + *pfrc = frc; + pfrc = &frc->next; + + frc->fr.type = NGTCP2_FRAME_MAX_STREAM_DATA; + frc->fr.max_stream_data.stream_id = 0; + frc->fr.max_stream_data.max_stream_data = + conn->strm0->unsent_max_rx_offset; + + /* TODO If we get NGTCP2_ERR_NOBUF below, we lose + MAX_STREAM_DATA update. */ + conn->strm0->max_rx_offset = conn->strm0->unsent_max_rx_offset; + + rv = ngtcp2_upe_encode_frame(&upe, &frc->fr); + if (rv != 0) { + goto fail; + } + + rv = conn_call_send_frame(conn, &hd, &frc->fr); if (rv != 0) { goto fail; } @@ -769,6 +846,8 @@ static ssize_t conn_encode_handshake_pkt(ngtcp2_conn *conn, uint8_t *dest, nwrite = ngtcp2_min(ngtcp2_buf_len(tx_buf), ngtcp2_upe_left(&upe) - NGTCP2_STREAM_OVERHEAD); + nwrite = + ngtcp2_min(nwrite, conn->strm0->max_tx_offset - conn->strm0->tx_offset); if (nwrite > 0) { rv = ngtcp2_frame_chain_new(&frc, conn->mem); @@ -776,6 +855,9 @@ static ssize_t conn_encode_handshake_pkt(ngtcp2_conn *conn, uint8_t *dest, goto fail; } + *pfrc = frc; + pfrc = &frc->next; + fr = &frc->fr; /* TODO Make a function to create STREAM frame */ @@ -814,9 +896,9 @@ static ssize_t conn_encode_handshake_pkt(ngtcp2_conn *conn, uint8_t *dest, ++conn->next_tx_pkt_num; - if (frc) { - rv = ngtcp2_rtb_entry_new(&rtbent, &hd, frc, ts + NGTCP2_INITIAL_EXPIRY, - conn->mem); + if (frc_head) { + rv = ngtcp2_rtb_entry_new(&rtbent, &hd, frc_head, + ts + NGTCP2_INITIAL_EXPIRY, conn->mem); if (rv != 0) { goto fail; } @@ -831,8 +913,11 @@ static ssize_t conn_encode_handshake_pkt(ngtcp2_conn *conn, uint8_t *dest, return (ssize_t)ngtcp2_upe_final(&upe, NULL); fail: - ngtcp2_frame_chain_del(frc, conn->mem); - + for (frc = frc_head; frc;) { + frc_next = frc->next; + ngtcp2_frame_chain_del(frc, conn->mem); + frc = frc_next; + } return rv; } @@ -986,6 +1071,8 @@ static ssize_t conn_send_pkt(ngtcp2_conn *conn, uint8_t *dest, size_t destlen, ngtcp2_frame_chain **pfrc, *nfrc; ngtcp2_rtb_entry *ent; size_t left; + ngtcp2_strm *strm, *strm_next; + int send_pkt_cb_called = 0; ackfr.type = !NGTCP2_FRAME_ACK; rv = conn_create_ack_frame(conn, &ackfr.ack, ts); @@ -1011,17 +1098,20 @@ static ssize_t conn_send_pkt(ngtcp2_conn *conn, uint8_t *dest, size_t destlen, return rv; } - rv = conn_call_send_pkt(conn, &hd); - if (rv != 0) { - return rv; - } - if (ackfr.type == NGTCP2_FRAME_ACK) { rv = ngtcp2_ppe_encode_frame(&ppe, &ackfr); if (rv != 0) { return rv; } + if (!send_pkt_cb_called) { + rv = conn_call_send_pkt(conn, &hd); + if (rv != 0) { + return rv; + } + send_pkt_cb_called = 1; + } + rv = conn_call_send_frame(conn, &hd, &ackfr); if (rv != 0) { return rv; @@ -1043,6 +1133,30 @@ static ssize_t conn_send_pkt(ngtcp2_conn *conn, uint8_t *dest, size_t destlen, buffer is too small. */ } + while (conn->fc_strms) { + strm = conn->fc_strms; + rv = ngtcp2_frame_chain_new(&nfrc, conn->mem); + if (rv != 0) { + return rv; + } + nfrc->fr.type = NGTCP2_FRAME_MAX_STREAM_DATA; + nfrc->fr.max_stream_data.stream_id = strm->stream_id; + nfrc->fr.max_stream_data.max_stream_data = strm->unsent_max_rx_offset; + nfrc->next = conn->frq; + conn->frq = nfrc; + + strm->max_rx_offset = strm->unsent_max_rx_offset; + + strm_next = strm->fc_next; + conn->fc_strms = strm_next; + if (strm_next) { + strm_next->fc_pprev = &conn->fc_strms; + } + strm->fc_next = NULL; + strm->fc_pprev = NULL; + strm = strm_next; + } + for (pfrc = &conn->frq; *pfrc; pfrc = &(*pfrc)->next) { if ((*pfrc)->fr.type == NGTCP2_FRAME_STREAM) { left = ngtcp2_ppe_left(&ppe); @@ -1075,6 +1189,15 @@ static ssize_t conn_send_pkt(ngtcp2_conn *conn, uint8_t *dest, size_t destlen, } return rv; } + + if (!send_pkt_cb_called) { + rv = conn_call_send_pkt(conn, &hd); + if (rv != 0) { + return rv; + } + send_pkt_cb_called = 1; + } + rv = conn_call_send_frame(conn, &hd, &(*pfrc)->fr); if (rv != 0) { return rv; @@ -1219,6 +1342,20 @@ static int conn_recv_ack(ngtcp2_conn *conn, ngtcp2_ack *fr) { return ngtcp2_rtb_recv_ack(&conn->rtb, fr, conn); } +static int conn_recv_max_stream_data(ngtcp2_conn *conn, + const ngtcp2_max_stream_data *fr) { + ngtcp2_strm *strm; + + strm = ngtcp2_conn_find_stream(conn, fr->stream_id); + if (strm == NULL) { + return 0; + } + + strm->max_tx_offset = ngtcp2_max(strm->max_tx_offset, fr->max_stream_data); + + return 0; +} + static int conn_buffer_protected_pkt(ngtcp2_conn *conn, const uint8_t *pkt, size_t pktlen, ngtcp2_tstamp ts) { int rv; @@ -1251,6 +1388,7 @@ static int conn_recv_handshake_pkt(ngtcp2_conn *conn, const uint8_t *pkt, int rv; int require_ack = 0; uint64_t rx_offset; + uint64_t fr_end_offset; if (!(pkt[0] & NGTCP2_HEADER_FORM_BIT)) { return conn_buffer_protected_pkt(conn, pkt, pktlen, ts); @@ -1321,12 +1459,19 @@ static int conn_recv_handshake_pkt(ngtcp2_conn *conn, const uint8_t *pkt, require_ack |= fr.type != NGTCP2_FRAME_ACK && fr.type != NGTCP2_FRAME_CONNECTION_CLOSE; - if (fr.type == NGTCP2_FRAME_ACK) { + switch (fr.type) { + case NGTCP2_FRAME_ACK: rv = conn_recv_ack(conn, &fr.ack); if (rv != 0) { return rv; } continue; + case NGTCP2_FRAME_MAX_STREAM_DATA: + rv = conn_recv_max_stream_data(conn, &fr.max_stream_data); + if (rv != 0) { + return rv; + } + continue; } if (fr.type != NGTCP2_FRAME_STREAM || fr.stream.stream_id != 0 || @@ -1343,10 +1488,9 @@ static int conn_recv_handshake_pkt(ngtcp2_conn *conn, const uint8_t *pkt, continue; } - /* TODO Refused to receive stream data which is more than 128KiB - for now. We can ditch this if flow control is implemented. */ - if (fr.stream.offset > 128 * 1024) { - return NGTCP2_ERR_INTERNAL; + fr_end_offset = fr.stream.offset + fr.stream.datalen; + if (conn->strm0->max_rx_offset < fr_end_offset) { + return NGTCP2_ERR_FLOW_CONTROL; } if (fr.stream.offset <= rx_offset) { @@ -1363,6 +1507,8 @@ static int conn_recv_handshake_pkt(ngtcp2_conn *conn, const uint8_t *pkt, return rv; } + conn->strm0->unsent_max_rx_offset += datalen; + rv = ngtcp2_conn_emit_pending_recv_handshake(conn, conn->strm0, rx_offset); if (rv != 0) { @@ -1416,7 +1562,9 @@ int ngtcp2_conn_init_stream(ngtcp2_conn *conn, ngtcp2_strm *strm, int rv; rv = ngtcp2_strm_init(strm, stream_id, NGTCP2_STRM_FLAG_NONE, - stream_user_data, conn->mem); + conn->local_settings.max_stream_data, + conn->remote_settings.max_stream_data, stream_user_data, + conn->mem); if (rv != 0) { ngtcp2_mem_free(conn->mem, strm); return rv; @@ -1478,8 +1626,7 @@ static int conn_recv_stream(ngtcp2_conn *conn, const ngtcp2_stream *fr) { return NGTCP2_ERR_PROTO; } - if (UINT64_MAX - fr->datalen < fr->offset || - conn->local_settings.max_stream_data < fr->offset + fr->datalen) { + if (UINT64_MAX - fr->datalen < fr->offset) { return NGTCP2_ERR_PROTO; } @@ -1513,6 +1660,11 @@ static int conn_recv_stream(ngtcp2_conn *conn, const ngtcp2_stream *fr) { } fr_end_offset = fr->offset + fr->datalen; + + if (strm->max_rx_offset < fr_end_offset) { + return NGTCP2_ERR_FLOW_CONTROL; + } + strm->last_rx_offset = ngtcp2_max(strm->last_rx_offset, fr_end_offset); if (fr->fin) { @@ -1668,6 +1820,12 @@ static int conn_recv_pkt(ngtcp2_conn *conn, uint8_t *pkt, size_t pktlen, return rv; } break; + case NGTCP2_FRAME_MAX_STREAM_DATA: + rv = conn_recv_max_stream_data(conn, &fr.max_stream_data); + if (rv != 0) { + return rv; + } + break; } } @@ -1801,6 +1959,8 @@ int ngtcp2_conn_emit_pending_recv_handshake(ngtcp2_conn *conn, return rv; } + strm->unsent_max_rx_offset += datalen; + ngtcp2_rob_pop(&strm->rob, rx_offset - datalen, datalen); } } @@ -1819,8 +1979,11 @@ int ngtcp2_conn_sched_ack(ngtcp2_conn *conn, uint64_t pkt_num, return rv; } - /* TODO Ignore error for now */ - ngtcp2_acktr_add(&conn->acktr, rpkt); + rv = ngtcp2_acktr_add(&conn->acktr, rpkt); + if (rv != 0) { + ngtcp2_acktr_entry_del(rpkt, conn->mem); + return rv; + } return 0; } @@ -2074,11 +2237,6 @@ ssize_t ngtcp2_conn_write_stream(ngtcp2_conn *conn, uint8_t *dest, return rv; } - rv = conn_call_send_pkt(conn, &hd); - if (rv != 0) { - return rv; - } - left = ngtcp2_ppe_left(&ppe); if (left <= NGTCP2_STREAM_OVERHEAD) { return NGTCP2_ERR_NOBUF; @@ -2088,6 +2246,16 @@ ssize_t ngtcp2_conn_write_stream(ngtcp2_conn *conn, uint8_t *dest, /* TODO Take into account flow control credit here */ ndatalen = ngtcp2_min(datalen, left); + ndatalen = ngtcp2_min(ndatalen, strm->max_tx_offset - strm->tx_offset); + + if (datalen > 0 && ndatalen == 0) { + return NGTCP2_ERR_STREAM_DATA_BLOCKED; + } + + rv = conn_call_send_pkt(conn, &hd); + if (rv != 0) { + return rv; + } rv = ngtcp2_frame_chain_new(&frc, conn->mem); if (rv != 0) { @@ -2123,6 +2291,7 @@ ssize_t ngtcp2_conn_write_stream(ngtcp2_conn *conn, uint8_t *dest, rv = ngtcp2_rtb_entry_new(&ent, &hd, frc, ts + NGTCP2_INITIAL_EXPIRY, conn->mem); if (rv != 0) { + ngtcp2_frame_chain_del(frc, conn->mem); return rv; } @@ -2163,6 +2332,13 @@ int ngtcp2_conn_close_stream(ngtcp2_conn *conn, ngtcp2_strm *strm) { conn->max_remote_stream_id += 2; } + if (strm->fc_pprev) { + *strm->fc_pprev = strm->fc_next; + if (strm->fc_next) { + strm->fc_next->fc_pprev = strm->fc_pprev; + } + } + ngtcp2_strm_free(strm); ngtcp2_mem_free(conn->mem, strm); @@ -2177,3 +2353,32 @@ int ngtcp2_conn_close_stream_if_shut_rdwr(ngtcp2_conn *conn, } return 0; } + +int ngtcp2_conn_extend_max_stream_offset(ngtcp2_conn *conn, uint32_t stream_id, + size_t datalen) { + ngtcp2_strm *strm; + + if (stream_id == 0) { + return NGTCP2_ERR_INVALID_ARGUMENT; + } + + strm = ngtcp2_conn_find_stream(conn, stream_id); + if (strm == NULL) { + return NGTCP2_ERR_INVALID_ARGUMENT; + } + + if (strm->unsent_max_rx_offset <= UINT64_MAX - datalen) { + strm->unsent_max_rx_offset += datalen; + } + + if (!strm->fc_pprev) { + strm->fc_pprev = &conn->fc_strms; + if (conn->fc_strms) { + strm->fc_next = conn->fc_strms; + conn->fc_strms->fc_pprev = &strm->fc_next; + } + conn->fc_strms = strm; + } + + return 0; +} diff --git a/lib/ngtcp2_conn.h b/lib/ngtcp2_conn.h index ff7f436931cb5070104f5d6563d0254aec60242b..ad7e7cd4391ee73e5b08d6b6d5666f199f85f380 100644 --- a/lib/ngtcp2_conn.h +++ b/lib/ngtcp2_conn.h @@ -60,6 +60,10 @@ typedef enum { packets buffered which arrive before handshake completes. */ #define NGTCP2_MAX_NUM_BUFFED_RX_PPKTS 16 +/* NGTCP2_STRM0_MAX_STREAM_DATA is the maximum stream offset that an + endpoint can send initially. */ +#define NGTCP2_STRM0_MAX_STREAM_DATA 65535 + struct ngtcp2_pkt_chain; typedef struct ngtcp2_pkt_chain ngtcp2_pkt_chain; @@ -98,6 +102,7 @@ struct ngtcp2_conn { ngtcp2_conn_callbacks callbacks; ngtcp2_strm *strm0; ngtcp2_map strms; + ngtcp2_strm *fc_strms; ngtcp2_idtr local_idtr; ngtcp2_idtr remote_idtr; uint64_t conn_id; diff --git a/lib/ngtcp2_err.c b/lib/ngtcp2_err.c index 22a7cb8eacb9fc7b72c9d31e7deb2ed68df519c7..a879a439373057895e2dbcf324fd11f8bff4d974 100644 --- a/lib/ngtcp2_err.c +++ b/lib/ngtcp2_err.c @@ -44,6 +44,10 @@ const char *ngtcp2_strerror(int liberr) { return "ERR_STREAM_ID_BLOCKED"; case NGTCP2_ERR_STREAM_IN_USE: return "ERR_STREAM_IN_USE"; + case NGTCP2_ERR_STREAM_DATA_BLOCKED: + return "ERR_STREAM_DATA_BLOCKED"; + case NGTCP2_ERR_FLOW_CONTROL: + return "ERR_FLOW_CONTROL"; case NGTCP2_ERR_NOMEM: return "ERR_NOMEM"; case NGTCP2_ERR_CALLBACK_FAILURE: diff --git a/lib/ngtcp2_strm.c b/lib/ngtcp2_strm.c index e70fbd5a28e669149d07b018761a675bc4ff3dcc..832fd0728e36137b6c961dd26c87b95ca0e94b72 100644 --- a/lib/ngtcp2_strm.c +++ b/lib/ngtcp2_strm.c @@ -27,6 +27,7 @@ #include <string.h> int ngtcp2_strm_init(ngtcp2_strm *strm, uint32_t stream_id, uint32_t flags, + uint64_t max_rx_offset, uint64_t max_tx_offset, void *stream_user_data, ngtcp2_mem *mem) { int rv; @@ -36,9 +37,13 @@ int ngtcp2_strm_init(ngtcp2_strm *strm, uint32_t stream_id, uint32_t flags, strm->stream_id = stream_id; strm->flags = flags; strm->stream_user_data = stream_user_data; + strm->max_rx_offset = strm->unsent_max_rx_offset = max_rx_offset; + strm->max_tx_offset = max_tx_offset; strm->me.key = stream_id; strm->me.next = NULL; strm->mem = mem; + strm->fc_pprev = NULL; + strm->fc_next = NULL; memset(&strm->tx_buf, 0, sizeof(strm->tx_buf)); rv = ngtcp2_gaptr_init(&strm->acked_tx_offset, mem); diff --git a/lib/ngtcp2_strm.h b/lib/ngtcp2_strm.h index 2d33ea1a4d96252b8d39036478c3b0d3d7305dc1..c681c916e7685fb0758739092f284d8a05d103b0 100644 --- a/lib/ngtcp2_strm.h +++ b/lib/ngtcp2_strm.h @@ -48,14 +48,28 @@ typedef enum { NGTCP2_STRM_FLAG_SHUT_RD | NGTCP2_STRM_FLAG_SHUT_WR, } ngtcp2_strm_flags; -typedef struct { +struct ngtcp2_strm; + +typedef struct ngtcp2_strm ngtcp2_strm; + +struct ngtcp2_strm { ngtcp2_map_entry me; uint64_t tx_offset; ngtcp2_gaptr acked_tx_offset; + /* max_tx_offset is the maximum offset that local endpoint can send + for this stream. */ + uint64_t max_tx_offset; /* last_rx_offset is the largest offset of stream data received for this stream. */ uint64_t last_rx_offset; ngtcp2_rob rob; + /* max_rx_offset is the maximum offset that remote endpoint can send + to this stream. */ + uint64_t max_rx_offset; + /* unsent_max_rx_offset is the maximum offset that remote endpoint + can send to this stream, and it is not notified to the remote + endpoint. unsent_max_rx_offset >= max_rx_offset must be hold. */ + uint64_t unsent_max_rx_offset; ngtcp2_mem *mem; size_t nbuffered; ngtcp2_buf tx_buf; @@ -63,9 +77,11 @@ typedef struct { void *stream_user_data; /* flags is bit-wise OR of zero or more of ngtcp2_strm_flags. */ uint32_t flags; -} ngtcp2_strm; + ngtcp2_strm **fc_pprev, *fc_next; +}; int ngtcp2_strm_init(ngtcp2_strm *strm, uint32_t stream_id, uint32_t flags, + uint64_t max_rx_offset, uint64_t max_tx_offset, void *stream_user_data, ngtcp2_mem *mem); void ngtcp2_strm_free(ngtcp2_strm *strm); diff --git a/tests/main.c b/tests/main.c index eb214ff5f3935f46876263e1a2527f2344491c6f..767e772ad4509bc54b71390b4c10cb19bff71a03 100644 --- a/tests/main.c +++ b/tests/main.c @@ -116,7 +116,13 @@ int main() { !CU_add_test(pSuite, "rtb_recv_ack", test_ngtcp2_rtb_recv_ack) || !CU_add_test(pSuite, "idtr_open", test_ngtcp2_idtr_open) || !CU_add_test(pSuite, "conn_stream_open_close", - test_ngtcp2_conn_stream_open_close)) { + test_ngtcp2_conn_stream_open_close) || + !CU_add_test(pSuite, "conn_stream_rx_flow_control", + test_ngtcp2_conn_stream_rx_flow_control) || + !CU_add_test(pSuite, "conn_stream_rx_flow_control_error", + test_ngtcp2_conn_stream_rx_flow_control_error) || + !CU_add_test(pSuite, "conn_stream_tx_flow_control", + test_ngtcp2_conn_stream_tx_flow_control)) { CU_cleanup_registry(); return CU_get_error(); } diff --git a/tests/ngtcp2_acktr_test.c b/tests/ngtcp2_acktr_test.c index 03f88e47829115e338433d985a995c7057d10695..199ae3b1cdd05e41489aa0cf8f6d3b4b94e5b679 100644 --- a/tests/ngtcp2_acktr_test.c +++ b/tests/ngtcp2_acktr_test.c @@ -76,7 +76,7 @@ void test_ngtcp2_acktr_add(void) { rv = ngtcp2_acktr_add(&acktr, &ents[0]); - CU_ASSERT(NGTCP2_ERR_INVALID_ARGUMENT == rv); + CU_ASSERT(NGTCP2_ERR_PROTO == rv); ngtcp2_acktr_free(&acktr); } diff --git a/tests/ngtcp2_conn_test.c b/tests/ngtcp2_conn_test.c index 03c714cd80fc44c828a848972d1ea383d5a5114b..f41b3a3f8e235e6bdcadd888a61f8f7b9fe0e38e 100644 --- a/tests/ngtcp2_conn_test.c +++ b/tests/ngtcp2_conn_test.c @@ -69,9 +69,18 @@ static ssize_t null_decrypt(ngtcp2_conn *conn, uint8_t *dest, size_t destlen, } static void server_default_settings(ngtcp2_settings *settings) { - settings->max_stream_data = 65536; + settings->max_stream_data = 65535; settings->max_data = 128; - settings->max_stream_id = 3; + settings->max_stream_id = 5; + settings->idle_timeout = 60; + settings->omit_connection_id = 0; + settings->max_packet_size = 65535; +} + +static void client_default_settings(ngtcp2_settings *settings) { + settings->max_stream_data = 65535; + settings->max_data = 128; + settings->max_stream_id = 0; settings->idle_timeout = 60; settings->omit_connection_id = 0; settings->max_packet_size = 65535; @@ -81,29 +90,52 @@ static uint8_t null_key[16]; static uint8_t null_iv[16]; static uint8_t null_data[4096]; -void test_ngtcp2_conn_stream_open_close(void) { - ngtcp2_conn *conn; +static void setup_default_server(ngtcp2_conn **pconn) { ngtcp2_conn_callbacks cb; ngtcp2_settings settings; - uint8_t buf[2048]; - size_t pktlen; - ssize_t spktlen; - int rv; - ngtcp2_frame fr; - ngtcp2_strm *strm; memset(&cb, 0, sizeof(cb)); cb.decrypt = null_decrypt; cb.encrypt = null_encrypt; server_default_settings(&settings); - ngtcp2_conn_server_new(&conn, 0x1, NGTCP2_PROTO_VERSION, &cb, &settings, + ngtcp2_conn_server_new(pconn, 0x1, NGTCP2_PROTO_VERSION, &cb, &settings, + NULL); + ngtcp2_conn_update_tx_keys(*pconn, null_key, sizeof(null_key), null_iv, + sizeof(null_iv)); + ngtcp2_conn_update_rx_keys(*pconn, null_key, sizeof(null_key), null_iv, + sizeof(null_iv)); + (*pconn)->state = NGTCP2_CS_POST_HANDSHAKE; +} + +static void setup_default_client(ngtcp2_conn **pconn) { + ngtcp2_conn_callbacks cb; + ngtcp2_settings settings; + + memset(&cb, 0, sizeof(cb)); + cb.decrypt = null_decrypt; + cb.encrypt = null_encrypt; + client_default_settings(&settings); + + ngtcp2_conn_client_new(pconn, 0x1, NGTCP2_PROTO_VERSION, &cb, &settings, NULL); - ngtcp2_conn_update_tx_keys(conn, null_key, sizeof(null_key), null_iv, + ngtcp2_conn_update_tx_keys(*pconn, null_key, sizeof(null_key), null_iv, sizeof(null_iv)); - ngtcp2_conn_update_rx_keys(conn, null_key, sizeof(null_key), null_iv, + ngtcp2_conn_update_rx_keys(*pconn, null_key, sizeof(null_key), null_iv, sizeof(null_iv)); - conn->state = NGTCP2_CS_POST_HANDSHAKE; + (*pconn)->state = NGTCP2_CS_POST_HANDSHAKE; +} + +void test_ngtcp2_conn_stream_open_close(void) { + ngtcp2_conn *conn; + uint8_t buf[2048]; + size_t pktlen; + ssize_t spktlen; + int rv; + ngtcp2_frame fr; + ngtcp2_strm *strm; + + setup_default_server(&conn); fr.type = NGTCP2_FRAME_STREAM; fr.stream.flags = 0; @@ -113,7 +145,7 @@ void test_ngtcp2_conn_stream_open_close(void) { fr.stream.datalen = 17; fr.stream.data = null_data; - pktlen = write_stream_pkt(conn, buf, sizeof(buf), 0xc, 1, &fr); + pktlen = write_single_frame_pkt(conn, buf, sizeof(buf), 0xc, 1, &fr); rv = ngtcp2_conn_recv(conn, buf, pktlen, 1); @@ -127,7 +159,7 @@ void test_ngtcp2_conn_stream_open_close(void) { fr.stream.offset = 17; fr.stream.datalen = 0; - pktlen = write_stream_pkt(conn, buf, sizeof(buf), 0xc, 2, &fr); + pktlen = write_single_frame_pkt(conn, buf, sizeof(buf), 0xc, 2, &fr); rv = ngtcp2_conn_recv(conn, buf, pktlen, 2); @@ -147,3 +179,161 @@ void test_ngtcp2_conn_stream_open_close(void) { ngtcp2_conn_del(conn); } + +void test_ngtcp2_conn_stream_rx_flow_control(void) { + ngtcp2_conn *conn; + uint8_t buf[2048]; + size_t pktlen; + ssize_t spktlen; + int rv; + ngtcp2_frame fr; + ngtcp2_strm *strm; + size_t i; + + setup_default_server(&conn); + + conn->local_settings.max_stream_data = 2047; + conn->local_settings.max_stream_id = 5; + + for (i = 0; i < 3; ++i) { + uint32_t stream_id = (uint32_t)(i * 2 + 1); + fr.type = NGTCP2_FRAME_STREAM; + fr.stream.flags = 0; + fr.stream.stream_id = stream_id; + fr.stream.fin = 0; + fr.stream.offset = 0; + fr.stream.datalen = 1024; + fr.stream.data = null_data; + + pktlen = write_single_frame_pkt(conn, buf, sizeof(buf), 0xc, i, &fr); + rv = ngtcp2_conn_recv(conn, buf, pktlen, 1); + + CU_ASSERT(0 == rv); + + strm = ngtcp2_conn_find_stream(conn, stream_id); + + CU_ASSERT(NULL != strm); + + rv = ngtcp2_conn_extend_max_stream_offset(conn, stream_id, + fr.stream.datalen); + + CU_ASSERT(0 == rv); + } + + strm = conn->fc_strms; + + CU_ASSERT(5 == strm->stream_id); + + strm = strm->fc_next; + + CU_ASSERT(3 == strm->stream_id); + + strm = strm->fc_next; + + CU_ASSERT(1 == strm->stream_id); + + strm = strm->fc_next; + + CU_ASSERT(NULL == strm); + + spktlen = ngtcp2_conn_send(conn, buf, sizeof(buf), 2); + + CU_ASSERT(spktlen > 0); + CU_ASSERT(NULL == conn->fc_strms); + + for (i = 0; i < 3; ++i) { + uint32_t stream_id = (uint32_t)(i * 2 + 1); + strm = ngtcp2_conn_find_stream(conn, stream_id); + + CU_ASSERT(2047 + 1024 == strm->max_rx_offset); + } + + ngtcp2_conn_del(conn); +} + +void test_ngtcp2_conn_stream_rx_flow_control_error(void) { + ngtcp2_conn *conn; + uint8_t buf[2048]; + size_t pktlen; + int rv; + ngtcp2_frame fr; + + setup_default_server(&conn); + + conn->local_settings.max_stream_data = 1023; + + fr.type = NGTCP2_FRAME_STREAM; + fr.stream.flags = 0; + fr.stream.stream_id = 1; + fr.stream.fin = 0; + fr.stream.offset = 0; + fr.stream.datalen = 1024; + fr.stream.data = null_data; + + pktlen = write_single_frame_pkt(conn, buf, sizeof(buf), 0xc, 1, &fr); + rv = ngtcp2_conn_recv(conn, buf, pktlen, 1); + + CU_ASSERT(NGTCP2_ERR_FLOW_CONTROL == rv); + + ngtcp2_conn_del(conn); +} + +void test_ngtcp2_conn_stream_tx_flow_control(void) { + ngtcp2_conn *conn; + uint8_t buf[2048]; + size_t pktlen; + ssize_t spktlen; + int rv; + ngtcp2_frame fr; + ngtcp2_strm *strm; + size_t nwrite; + + setup_default_client(&conn); + + conn->remote_settings.max_stream_data = 2047; + conn->remote_settings.max_stream_id = 5; + + rv = ngtcp2_conn_open_stream(conn, 1, NULL); + + CU_ASSERT(0 == rv); + + strm = ngtcp2_conn_find_stream(conn, 1); + spktlen = ngtcp2_conn_write_stream(conn, buf, sizeof(buf), &nwrite, 1, 0, + null_data, 1024, 1); + + CU_ASSERT(spktlen > 0); + CU_ASSERT(1024 == nwrite); + CU_ASSERT(1024 == strm->tx_offset); + + spktlen = ngtcp2_conn_write_stream(conn, buf, sizeof(buf), &nwrite, 1, 0, + null_data, 1024, 2); + + CU_ASSERT(spktlen > 0); + CU_ASSERT(1023 == nwrite); + CU_ASSERT(2047 == strm->tx_offset); + + spktlen = ngtcp2_conn_write_stream(conn, buf, sizeof(buf), &nwrite, 1, 0, + null_data, 1024, 3); + + CU_ASSERT(NGTCP2_ERR_STREAM_DATA_BLOCKED == spktlen); + + fr.type = NGTCP2_FRAME_MAX_STREAM_DATA; + fr.max_stream_data.stream_id = 1; + fr.max_stream_data.max_stream_data = 2048; + + pktlen = write_single_frame_pkt(conn, buf, sizeof(buf), 0xc, 1, &fr); + + rv = ngtcp2_conn_recv(conn, buf, pktlen, 4); + + CU_ASSERT(0 == rv); + CU_ASSERT(2048 == strm->max_tx_offset); + + spktlen = ngtcp2_conn_write_stream(conn, buf, sizeof(buf), &nwrite, 1, 0, + null_data, 1024, 5); + + CU_ASSERT(spktlen > 0); + CU_ASSERT(1 == nwrite); + CU_ASSERT(2048 == strm->tx_offset); + + ngtcp2_conn_del(conn); +} diff --git a/tests/ngtcp2_conn_test.h b/tests/ngtcp2_conn_test.h index c196843c06bbfcb574bb396f9ebf159c3ba4e0b6..7c09d04df1383cbe20c737fea9efafdeb0b4498d 100644 --- a/tests/ngtcp2_conn_test.h +++ b/tests/ngtcp2_conn_test.h @@ -30,5 +30,8 @@ #endif /* HAVE_CONFIG_H */ void test_ngtcp2_conn_stream_open_close(void); +void test_ngtcp2_conn_stream_rx_flow_control(void); +void test_ngtcp2_conn_stream_rx_flow_control_error(void); +void test_ngtcp2_conn_stream_tx_flow_control(void); #endif /* NGTCP2_CONN_TEST_H */ diff --git a/tests/ngtcp2_test_helper.c b/tests/ngtcp2_test_helper.c index 05ebed2bb7f901715092777e398fc1d115297cc8..180cc4ea1f9a9d913b15204b16d7e07cb372e6ee 100644 --- a/tests/ngtcp2_test_helper.c +++ b/tests/ngtcp2_test_helper.c @@ -120,9 +120,9 @@ static ssize_t null_encrypt(ngtcp2_conn *conn, uint8_t *dest, size_t destlen, return (ssize_t)plaintextlen; } -size_t write_stream_pkt(ngtcp2_conn *conn, uint8_t *out, size_t outlen, - uint64_t conn_id, uint64_t pkt_num, - const ngtcp2_frame *fr) { +size_t write_single_frame_pkt(ngtcp2_conn *conn, uint8_t *out, size_t outlen, + uint64_t conn_id, uint64_t pkt_num, + const ngtcp2_frame *fr) { ngtcp2_crypto_ctx ctx; ngtcp2_ppe ppe; ngtcp2_mem *mem = ngtcp2_mem_default(); diff --git a/tests/ngtcp2_test_helper.h b/tests/ngtcp2_test_helper.h index e7449d779cd39b8815c90a6346041cdf3db3762f..ca92ada7cdcf730c6fc526fd95afabe0bd86f42b 100644 --- a/tests/ngtcp2_test_helper.h +++ b/tests/ngtcp2_test_helper.h @@ -70,12 +70,12 @@ size_t ngtcp2_t_encode_ack_frame(uint8_t *out, uint64_t largest_ack, uint64_t ack_blklen); /* - * write_stream_pkt writes a QUIC packet containing single frame |fr| - * in |out| whose capacity is |outlen|. This function returns the - * number of bytes written. + * write_single_frame_pkt writes a QUIC packet containing single frame + * |fr| in |out| whose capacity is |outlen|. This function returns + * the number of bytes written. */ -size_t write_stream_pkt(ngtcp2_conn *conn, uint8_t *out, size_t outlen, - uint64_t conn_id, uint64_t pkt_num, - const ngtcp2_frame *fr); +size_t write_single_frame_pkt(ngtcp2_conn *conn, uint8_t *out, size_t outlen, + uint64_t conn_id, uint64_t pkt_num, + const ngtcp2_frame *fr); #endif /* NGTCP2_TEST_HELPER_H */