From 4ecd51ed2282daff1cd58722d72eb3847a722846 Mon Sep 17 00:00:00 2001
From: Tatsuhiro Tsujikawa <tatsuhiro.t@gmail.com>
Date: Sun, 30 Jul 2017 14:28:23 +0900
Subject: [PATCH] Implement stream level flow control

---
 examples/client.cc           | 107 +++++++-----
 examples/client.h            |  28 ++-
 examples/server.cc           |  94 +++++++----
 examples/server.h            |  31 +++-
 lib/includes/ngtcp2/ngtcp2.h |  20 ++-
 lib/ngtcp2_acktr.c           |   2 +-
 lib/ngtcp2_acktr.h           |   2 +-
 lib/ngtcp2_conn.c            | 319 ++++++++++++++++++++++++++++-------
 lib/ngtcp2_conn.h            |   5 +
 lib/ngtcp2_err.c             |   4 +
 lib/ngtcp2_strm.c            |   5 +
 lib/ngtcp2_strm.h            |  20 ++-
 tests/main.c                 |   8 +-
 tests/ngtcp2_acktr_test.c    |   2 +-
 tests/ngtcp2_conn_test.c     | 222 ++++++++++++++++++++++--
 tests/ngtcp2_conn_test.h     |   3 +
 tests/ngtcp2_test_helper.c   |   6 +-
 tests/ngtcp2_test_helper.h   |  12 +-
 18 files changed, 717 insertions(+), 173 deletions(-)

diff --git a/examples/client.cc b/examples/client.cc
index ba4a3f36..9040087a 100644
--- a/examples/client.cc
+++ b/examples/client.cc
@@ -54,6 +54,11 @@ namespace {
 Config config{};
 } // namespace
 
+Buffer::Buffer(const uint8_t *data, size_t datalen)
+    : buf{data, data + datalen}, pos(std::begin(buf)) {}
+
+Buffer::Buffer() : pos(std::begin(buf)) {}
+
 namespace {
 int bio_write(BIO *b, const char *buf, int len) {
   int rv;
@@ -138,7 +143,13 @@ BIO_METHOD *create_bio_method() {
 } // namespace
 
 namespace {
-void writecb(struct ev_loop *loop, ev_io *w, int revents) {}
+void writecb(struct ev_loop *loop, ev_io *w, int revents) {
+  auto c = static_cast<Client *>(w->data);
+
+  if (c->on_write() != 0) {
+    c->disconnect();
+  }
+}
 } // namespace
 
 namespace {
@@ -192,10 +203,13 @@ Client::Client(struct ev_loop *loop, SSL_CTX *ssl_ctx)
       stdinfd_(-1),
       stream_id_(0),
       chandshake_idx_(0),
+      tx_stream0_offset_(0),
       nsread_(0),
       conn_(nullptr),
       crypto_ctx_{},
-      streambuf_idx_(0) {
+      streambuf_idx_(0),
+      tx_stream_offset_(0),
+      should_send_fin_(false) {
   ev_io_init(&wev_, writecb, 0, EV_WRITE);
   ev_io_init(&rev_, readcb, 0, EV_READ);
   ev_io_init(&stdinrev_, stdin_readcb, 0, EV_READ);
@@ -286,7 +300,7 @@ int recv_stream_data(ngtcp2_conn *conn, uint32_t stream_id, uint8_t fin,
                      const uint8_t *data, size_t datalen, void *user_data,
                      void *stream_user_data) {
   debug::print_stream_data(stream_id, data, datalen);
-
+  ngtcp2_conn_extend_max_stream_offset(conn, stream_id, datalen);
   return 0;
 }
 } // namespace
@@ -410,8 +424,8 @@ int Client::init(int fd, const Address &remote_addr, const char *addr,
       0, std::numeric_limits<uint64_t>::max())(randgen);
 
   ngtcp2_settings settings;
-  settings.max_stream_data = 128_k;
-  settings.max_data = 128;
+  settings.max_stream_data = 64_k;
+  settings.max_data = 64;
   settings.max_stream_id = 0;
   settings.idle_timeout = 5;
   settings.omit_connection_id = 0;
@@ -514,6 +528,26 @@ int Client::on_write() {
   std::array<uint8_t, NGTCP2_MAX_PKTLEN_IPV4> buf;
   assert(buf.size() >= max_pktlen_);
 
+  for (auto it = std::begin(streambuf_) + streambuf_idx_;
+       it != std::end(streambuf_); ++it) {
+    auto &v = *it;
+    if (on_write_stream(stream_id_, 0, v) != 0) {
+      return -1;
+    }
+    if (v.left() > 0) {
+      break;
+    }
+    ++streambuf_idx_;
+  }
+
+  if (streambuf_idx_ == streambuf_.size() && should_send_fin_) {
+    should_send_fin_ = false;
+    auto v = Buffer{};
+    if (on_write_stream(stream_id_, 1, v) != 0) {
+      return -1;
+    }
+  }
+
   for (;;) {
     auto n =
         ngtcp2_conn_send(conn_, buf.data(), max_pktlen_, util::timestamp());
@@ -522,8 +556,7 @@ int Client::on_write() {
       return -1;
     }
     if (n == 0) {
-      schedule_retransmit();
-      return 0;
+      break;
     }
 
     if (debug::packet_lost(config.tx_loss_prob)) {
@@ -537,30 +570,34 @@ int Client::on_write() {
       return -1;
     }
   }
+
+  schedule_retransmit();
+  return 0;
 }
 
-int Client::on_write_stream(uint32_t stream_id, uint8_t fin,
-                            const uint8_t *data, size_t datalen) {
+int Client::on_write_stream(uint32_t stream_id, uint8_t fin, Buffer &data) {
   std::array<uint8_t, NGTCP2_MAX_PKTLEN_IPV4> buf;
   assert(buf.size() >= max_pktlen_);
   size_t ndatalen;
 
   for (;;) {
     auto n = ngtcp2_conn_write_stream(conn_, buf.data(), max_pktlen_, &ndatalen,
-                                      stream_id, fin, data, datalen,
+                                      stream_id, fin, data.rpos(), data.left(),
                                       util::timestamp());
     if (n < 0) {
+      if (n == NGTCP2_ERR_STREAM_DATA_BLOCKED) {
+        return 0;
+      }
       std::cerr << "ngtcp2_conn_write_stream: " << ngtcp2_strerror(n)
                 << std::endl;
       return -1;
     }
 
-    data += ndatalen;
-    datalen -= ndatalen;
+    data.pos += ndatalen;
 
     if (debug::packet_lost(config.tx_loss_prob)) {
       std::cerr << "** Simulated outgoing packet loss **" << std::endl;
-      if (datalen == 0) {
+      if (data.left() == 0) {
         break;
       }
       continue;
@@ -572,14 +609,11 @@ int Client::on_write_stream(uint32_t stream_id, uint8_t fin,
       return -1;
     }
 
-    if (datalen == 0) {
+    if (data.left() == 0) {
       break;
     }
   }
 
-  if (datalen == 0) {
-    schedule_retransmit();
-  }
   return 0;
 }
 
@@ -602,7 +636,7 @@ void Client::schedule_retransmit() {
 }
 
 int Client::write_client_handshake(const uint8_t *data, size_t datalen) {
-  chandshake_.emplace_back(data, data + datalen);
+  chandshake_.emplace_back(data, datalen);
   return 0;
 }
 
@@ -611,8 +645,8 @@ size_t Client::read_client_handshake(const uint8_t **pdest) {
     return 0;
   }
   const auto &v = chandshake_[chandshake_idx_++];
-  *pdest = v.data();
-  return v.size();
+  *pdest = v.buf.data();
+  return v.buf.size();
 }
 
 size_t Client::read_server_handshake(uint8_t *buf, size_t buflen) {
@@ -753,37 +787,32 @@ int Client::send_interactive_input() {
     return stop_interactive_input();
   }
 
-  streambuf_.emplace_back(std::begin(buf), std::end(buf));
-  const auto &v = streambuf_.back();
+  streambuf_.emplace_back(buf.data(), nread);
 
-  rv = on_write_stream(stream_id_, 0, v.data(), nread);
-  if (rv != 0) {
-    return -1;
-  }
+  ev_feed_event(loop_, &wev_, EV_WRITE);
 
-  return on_write();
+  return 0;
 }
 
 int Client::stop_interactive_input() {
   int rv;
-  rv = on_write_stream(stream_id_, 1, nullptr, 0);
-  if (rv != 0) {
-    return -1;
-  }
 
+  should_send_fin_ = true;
   ev_io_stop(loop_, &stdinrev_);
 
   std::cerr << "Interactive session has ended." << std::endl;
 
-  return on_write();
+  ev_feed_event(loop_, &wev_, EV_WRITE);
+
+  return 0;
 }
 
 namespace {
-void remove_tx_stream_data(std::deque<std::vector<uint8_t>> &d, size_t &idx,
-                           size_t datalen) {
-  for (; !d.empty() && d.front().size() <= datalen;) {
+void remove_tx_stream_data(std::deque<Buffer> &d, size_t &idx,
+                           uint64_t &tx_offset, uint64_t offset) {
+  for (; !d.empty() && tx_offset + d.front().buf.size() <= offset;) {
     --idx;
-    datalen -= d.front().size();
+    tx_offset += d.front().buf.size();
     d.pop_front();
   }
 }
@@ -792,10 +821,12 @@ void remove_tx_stream_data(std::deque<std::vector<uint8_t>> &d, size_t &idx,
 void Client::remove_tx_stream_data(uint32_t stream_id, uint64_t offset,
                                    size_t datalen) {
   if (stream_id == 0) {
-    ::remove_tx_stream_data(chandshake_, chandshake_idx_, datalen);
+    ::remove_tx_stream_data(chandshake_, chandshake_idx_, tx_stream0_offset_,
+                            offset + datalen);
     return;
   }
-  ::remove_tx_stream_data(streambuf_, streambuf_idx_, datalen);
+  ::remove_tx_stream_data(streambuf_, streambuf_idx_, tx_stream_offset_,
+                          offset + datalen);
 }
 
 namespace {
diff --git a/examples/client.h b/examples/client.h
index 85be4693..d347d483 100644
--- a/examples/client.h
+++ b/examples/client.h
@@ -53,6 +53,18 @@ struct Config {
   int fd;
 };
 
+struct Buffer {
+  Buffer(const uint8_t *data, size_t datalen);
+  Buffer();
+  size_t left() const { return std::end(buf) - pos; }
+  const uint8_t *rpos() const {
+    return buf.data() + std::distance(std::begin(buf), pos);
+  }
+
+  std::vector<uint8_t> buf;
+  std::vector<uint8_t>::const_iterator pos;
+};
+
 class Client {
 public:
   Client(struct ev_loop *loop, SSL_CTX *ssl_ctx);
@@ -64,8 +76,7 @@ public:
   int tls_handshake();
   int on_read();
   int on_write();
-  int on_write_stream(uint32_t stream_id, uint8_t fin, const uint8_t *data,
-                      size_t datalen);
+  int on_write_stream(uint32_t stream_id, uint8_t fin, Buffer &data);
   int feed_data(uint8_t *data, size_t datalen);
   void schedule_retransmit();
 
@@ -105,14 +116,23 @@ private:
   int fd_;
   int stdinfd_;
   uint32_t stream_id_;
-  std::deque<std::vector<uint8_t>> chandshake_;
+  std::deque<Buffer> chandshake_;
+  // chandshake_idx_ is the index in chandshake_, which points to the
+  // buffer to read next.
   size_t chandshake_idx_;
+  uint64_t tx_stream0_offset_;
   std::vector<uint8_t> shandshake_;
   size_t nsread_;
   ngtcp2_conn *conn_;
   crypto::Context crypto_ctx_;
-  std::deque<std::vector<uint8_t>> streambuf_;
+  std::deque<Buffer> streambuf_;
+  // streambuf_idx_ is the index in streambuf_, which points to the
+  // buffer to send next.
   size_t streambuf_idx_;
+  // tx_stream_offset_ is the offset where all data before offset is
+  // acked by the remote endpoint.
+  uint64_t tx_stream_offset_;
+  bool should_send_fin_;
 };
 
 #endif // CLIENT_H
diff --git a/examples/server.cc b/examples/server.cc
index 387bec43..fc7938c5 100644
--- a/examples/server.cc
+++ b/examples/server.cc
@@ -53,6 +53,11 @@ namespace {
 Config config{};
 } // namespace
 
+Buffer::Buffer(const uint8_t *data, size_t datalen)
+    : buf{data, data + datalen}, pos(std::begin(buf)) {}
+
+Buffer::Buffer() : pos(std::begin(buf)) {}
+
 namespace {
 int bio_write(BIO *b, const char *buf, int len) {
   int rv;
@@ -137,7 +142,10 @@ BIO_METHOD *create_bio_method() {
 } // namespace
 
 Stream::Stream(uint32_t stream_id)
-    : stream_id(stream_id), streambuf_idx(0), should_send_fin(false) {}
+    : stream_id(stream_id),
+      streambuf_idx(0),
+      tx_stream_offset(0),
+      should_send_fin(false) {}
 
 namespace {
 void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
@@ -175,7 +183,8 @@ Handler::Handler(struct ev_loop *loop, SSL_CTX *ssl_ctx, Server *server)
       conn_(nullptr),
       crypto_ctx_{},
       conn_id_(std::uniform_int_distribution<uint64_t>(
-          0, std::numeric_limits<uint64_t>::max())(randgen)) {
+          0, std::numeric_limits<uint64_t>::max())(randgen)),
+      tx_stream0_offset_(0) {
   ev_timer_init(&timer_, timeoutcb, 0., 30.);
   timer_.data = this;
   ev_timer_init(&rttimer_, retransmitcb, 0., 0.);
@@ -354,8 +363,8 @@ int Handler::init(int fd, const sockaddr *sa, socklen_t salen) {
 
   ngtcp2_settings settings;
 
-  settings.max_stream_data = 128_k;
-  settings.max_data = 128;
+  settings.max_stream_data = 64_k;
+  settings.max_data = 64;
   // TODO Just allow stream ID = 1 to exchange encrypted data for now.
   settings.max_stream_id = 1;
   settings.idle_timeout = 5;
@@ -404,7 +413,7 @@ int Handler::tls_handshake() {
 }
 
 int Handler::write_server_handshake(const uint8_t *data, size_t datalen) {
-  shandshake_.emplace_back(data, data + datalen);
+  shandshake_.emplace_back(data, datalen);
   return 0;
 }
 
@@ -412,9 +421,11 @@ size_t Handler::read_server_handshake(const uint8_t **pdest) {
   if (shandshake_idx_ == shandshake_.size()) {
     return 0;
   }
-  const auto &v = shandshake_[shandshake_idx_++];
-  *pdest = v.data();
-  return v.size();
+  auto &v = shandshake_[shandshake_idx_++];
+  *pdest = v.rpos();
+  auto left = v.left();
+  v.pos += left;
+  return left;
 }
 
 size_t Handler::read_client_handshake(uint8_t *buf, size_t buflen) {
@@ -558,8 +569,7 @@ int Handler::on_write() {
       return -1;
     }
     if (n == 0) {
-      schedule_retransmit();
-      return 0;
+      break;
     }
 
     if (debug::packet_lost(config.tx_loss_prob)) {
@@ -574,6 +584,9 @@ int Handler::on_write() {
       return -1;
     }
   }
+
+  schedule_retransmit();
+  return 0;
 }
 
 int Handler::on_write_stream(Stream &stream) {
@@ -585,7 +598,8 @@ int Handler::on_write_stream(Stream &stream) {
   if (stream.streambuf_idx == stream.streambuf.size()) {
     if (stream.should_send_fin) {
       stream.should_send_fin = false;
-      if (write_stream_data(stream, 1, nullptr, 0) != 0) {
+      auto v = Buffer{};
+      if (write_stream_data(stream, 1, v) != 0) {
         return -1;
       }
     }
@@ -594,46 +608,48 @@ int Handler::on_write_stream(Stream &stream) {
 
   for (auto it = std::begin(stream.streambuf) + stream.streambuf_idx;
        it != std::end(stream.streambuf); ++it) {
-    const auto &v = *it;
+    auto &v = *it;
     auto fin = stream.should_send_fin &&
                stream.streambuf_idx == stream.streambuf.size() - 1;
-    if (fin) {
-      stream.should_send_fin = false;
-    }
-    if (write_stream_data(stream, fin, v.data(), v.size()) != 0) {
+    if (write_stream_data(stream, fin, v) != 0) {
       return -1;
     }
+    if (v.left() > 0) {
+      break;
+    }
     ++stream.streambuf_idx;
+    if (fin) {
+      stream.should_send_fin = false;
+    }
   }
 
-  schedule_retransmit();
-
   return 0;
 }
 
-int Handler::write_stream_data(Stream &stream, int fin, const uint8_t *data,
-                               size_t datalen) {
+int Handler::write_stream_data(Stream &stream, int fin, Buffer &data) {
   std::array<uint8_t, NGTCP2_MAX_PKTLEN_IPV4> buf;
   size_t ndatalen;
 
   assert(buf.size() >= max_pktlen_);
 
-  for (; datalen || fin;) {
+  for (;;) {
     auto n = ngtcp2_conn_write_stream(conn_, buf.data(), max_pktlen_, &ndatalen,
-                                      stream.stream_id, fin && datalen == 0,
-                                      data, datalen, util::timestamp());
+                                      stream.stream_id, fin, data.rpos(),
+                                      data.left(), util::timestamp());
     if (n < 0) {
+      if (n == NGTCP2_ERR_STREAM_DATA_BLOCKED) {
+        return 0;
+      }
       std::cerr << "ngtcp2_conn_write_stream: " << ngtcp2_strerror(n)
                 << std::endl;
       return -1;
     }
 
-    data += ndatalen;
-    datalen -= ndatalen;
+    data.pos += ndatalen;
 
     if (debug::packet_lost(config.tx_loss_prob)) {
       std::cerr << "** Simulated outgoing packet loss **" << std::endl;
-      if (fin && ndatalen == 0) {
+      if (data.left() == 0) {
         return 0;
       }
       continue;
@@ -645,7 +661,7 @@ int Handler::write_stream_data(Stream &stream, int fin, const uint8_t *data,
       std::cerr << "sendto: " << strerror(errno) << std::endl;
       return -1;
     }
-    if (fin && ndatalen == 0) {
+    if (data.left() == 0) {
       return 0;
     }
   }
@@ -689,20 +705,24 @@ int Handler::recv_stream_data(uint32_t stream_id, uint8_t fin,
     static constexpr uint8_t start_tag[] = "<blink>";
     static constexpr uint8_t end_tag[] = "</blink>";
 
-    auto v = std::vector<uint8_t>();
-    v.resize(str_size(start_tag) + datalen + str_size(end_tag));
+    auto v = Buffer{};
+    v.buf.resize(str_size(start_tag) + datalen + str_size(end_tag));
 
-    auto p = v.data();
+    auto p = std::begin(v.buf);
 
     p = std::copy_n(start_tag, str_size(start_tag), p);
     p = std::copy_n(data, datalen, p);
     p = std::copy_n(end_tag, str_size(end_tag), p);
 
+    v.pos = std::begin(v.buf);
+
     stream.streambuf.emplace_back(std::move(v));
   }
 
   stream.should_send_fin = fin != 0;
 
+  ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, datalen);
+
   return 0;
 }
 
@@ -715,11 +735,11 @@ const Address &Handler::remote_addr() const { return remote_addr_; }
 ngtcp2_conn *Handler::conn() const { return conn_; }
 
 namespace {
-void remove_tx_stream_data(std::deque<std::vector<uint8_t>> &d, size_t &idx,
-                           size_t datalen) {
-  for (; !d.empty() && d.front().size() <= datalen;) {
+void remove_tx_stream_data(std::deque<Buffer> &d, size_t &idx,
+                           uint64_t &tx_offset, uint64_t offset) {
+  for (; !d.empty() && tx_offset + d.front().buf.size() <= offset;) {
     --idx;
-    datalen -= d.front().size();
+    tx_offset += d.front().buf.size();
     d.pop_front();
   }
 }
@@ -728,13 +748,15 @@ void remove_tx_stream_data(std::deque<std::vector<uint8_t>> &d, size_t &idx,
 void Handler::remove_tx_stream_data(uint32_t stream_id, uint64_t offset,
                                     size_t datalen) {
   if (stream_id == 0) {
-    ::remove_tx_stream_data(shandshake_, shandshake_idx_, datalen);
+    ::remove_tx_stream_data(shandshake_, shandshake_idx_, tx_stream0_offset_,
+                            offset + datalen);
     return;
   }
   auto it = streams_.find(stream_id);
   assert(it != std::end(streams_));
   auto &stream = (*it).second;
-  ::remove_tx_stream_data(stream.streambuf, stream.streambuf_idx, datalen);
+  ::remove_tx_stream_data(stream.streambuf, stream.streambuf_idx,
+                          stream.tx_stream_offset, offset + datalen);
 }
 
 namespace {
diff --git a/examples/server.h b/examples/server.h
index 0cc59543..e10b3aed 100644
--- a/examples/server.h
+++ b/examples/server.h
@@ -51,14 +51,29 @@ struct Config {
   double rx_loss_prob;
 };
 
+struct Buffer {
+  Buffer(const uint8_t *data, size_t datalen);
+  Buffer();
+  size_t left() const { return std::end(buf) - pos; }
+  const uint8_t *rpos() const {
+    return buf.data() + std::distance(std::begin(buf), pos);
+  }
+
+  std::vector<uint8_t> buf;
+  std::vector<uint8_t>::const_iterator pos;
+};
+
 struct Stream {
   Stream(uint32_t stream_id);
 
   uint32_t stream_id;
-  std::deque<std::vector<uint8_t>> streambuf;
+  std::deque<Buffer> streambuf;
+  // streambuf_idx is the index in streambuf, which points to the
+  // buffer to send next.
   size_t streambuf_idx;
-  size_t stream_woffset;
-  size_t stream_roffset;
+  // tx_stream_offset is the offset where all data before offset is
+  // acked by the remote endpoint.
+  uint64_t tx_stream_offset;
   bool should_send_fin;
 };
 
@@ -74,8 +89,7 @@ public:
   int on_read(uint8_t *data, size_t datalen);
   int on_write();
   int on_write_stream(Stream &stream);
-  int write_stream_data(Stream &stream, int fin, const uint8_t *data,
-                        size_t datalen);
+  int write_stream_data(Stream &stream, int fin, Buffer &data);
   int feed_data(uint8_t *data, size_t datalen);
   void schedule_retransmit();
   void signal_write();
@@ -116,12 +130,17 @@ private:
   ev_timer rttimer_;
   std::vector<uint8_t> chandshake_;
   size_t ncread_;
-  std::deque<std::vector<uint8_t>> shandshake_;
+  std::deque<Buffer> shandshake_;
+  // shandshake_idx_ is the index in shandshake_, which points to the
+  // buffer to read next.
   size_t shandshake_idx_;
   ngtcp2_conn *conn_;
   crypto::Context crypto_ctx_;
   std::map<uint32_t, Stream> streams_;
   uint64_t conn_id_;
+  // tx_stream0_offset_ is the offset where all data before offset is
+  // acked by the remote endpoint.
+  uint64_t tx_stream0_offset_;
 };
 
 class Server {
diff --git a/lib/includes/ngtcp2/ngtcp2.h b/lib/includes/ngtcp2/ngtcp2.h
index 4345c647..af036886 100644
--- a/lib/includes/ngtcp2/ngtcp2.h
+++ b/lib/includes/ngtcp2/ngtcp2.h
@@ -176,6 +176,8 @@ typedef enum {
   NGTCP2_ERR_BAD_ACK = -207,
   NGTCP2_ERR_STREAM_ID_BLOCKED = -208,
   NGTCP2_ERR_STREAM_IN_USE = -209,
+  NGTCP2_ERR_STREAM_DATA_BLOCKED = -210,
+  NGTCP2_ERR_FLOW_CONTROL = -211,
   NGTCP2_ERR_FATAL = -500,
   NGTCP2_ERR_NOMEM = -501,
   NGTCP2_ERR_CALLBACK_FAILURE = -502,
@@ -840,7 +842,7 @@ NGTCP2_EXTERN int ngtcp2_conn_open_stream(ngtcp2_conn *conn, uint32_t stream_id,
  * :enum:`NGTCP2_ERR_NOBUF`
  *     Buffer is too small
  * :enum:`NGTCP2_ERR_INVALID_ARGUMENT`
- *     Stream does not exist.
+ *     Stream does not exist; or |stream_id| is 0.
  * :enum:`NGTCP2_ERR_CALLBACK_FAILURE`
  *     User callback failed
  */
@@ -851,6 +853,22 @@ NGTCP2_EXTERN ssize_t ngtcp2_conn_write_stream(ngtcp2_conn *conn, uint8_t *dest,
                                                size_t datalen,
                                                ngtcp2_tstamp ts);
 
+/**
+ * @function
+ *
+ * `ngtcp2_conn_extend_max_stream_offset` extends stream's max stream
+ * data value by |datalen|.
+ *
+ * This function returns 0 if it succeeds, or one of the following
+ * negative error codes:
+ *
+ * :enum:`NGTCP2_ERR_INVALID_ARGUMENT`
+ *     |stream_id| is 0; or stream was not found
+ */
+NGTCP2_EXTERN int ngtcp2_conn_extend_max_stream_offset(ngtcp2_conn *conn,
+                                                       uint32_t stream_id,
+                                                       size_t datalen);
+
 /**
  * @function
  *
diff --git a/lib/ngtcp2_acktr.c b/lib/ngtcp2_acktr.c
index ac1897c8..43a2476c 100644
--- a/lib/ngtcp2_acktr.c
+++ b/lib/ngtcp2_acktr.c
@@ -55,7 +55,7 @@ int ngtcp2_acktr_add(ngtcp2_acktr *acktr, ngtcp2_acktr_entry *ent) {
     }
     /* TODO What to do if we receive duplicated packet number? */
     if ((*pent)->pkt_num == ent->pkt_num) {
-      return NGTCP2_ERR_INVALID_ARGUMENT;
+      return NGTCP2_ERR_PROTO;
     }
     break;
   }
diff --git a/lib/ngtcp2_acktr.h b/lib/ngtcp2_acktr.h
index ae63f065..29a7d049 100644
--- a/lib/ngtcp2_acktr.h
+++ b/lib/ngtcp2_acktr.h
@@ -85,7 +85,7 @@ void ngtcp2_acktr_free(ngtcp2_acktr *acktr);
  * This function returns 0 if it succeeds, or one of the following
  * negative error codes:
  *
- * NGTCP2_ERR_INVALID_ARGUMENT
+ * NGTCP2_ERR_PROTO
  *     Same packet number has already been included in |acktr|.
  */
 int ngtcp2_acktr_add(ngtcp2_acktr *acktr, ngtcp2_acktr_entry *ent);
diff --git a/lib/ngtcp2_conn.c b/lib/ngtcp2_conn.c
index b741d324..a3fdd91e 100644
--- a/lib/ngtcp2_conn.c
+++ b/lib/ngtcp2_conn.c
@@ -166,7 +166,10 @@ static int conn_new(ngtcp2_conn **pconn, uint64_t conn_id, uint32_t version,
     rv = NGTCP2_ERR_NOMEM;
     goto fail_strm0_malloc;
   }
-  rv = ngtcp2_strm_init((*pconn)->strm0, 0, NGTCP2_STRM_FLAG_NONE, NULL, mem);
+  /* TODO Initial max_stream_data for stream 0? */
+  rv = ngtcp2_strm_init((*pconn)->strm0, 0, NGTCP2_STRM_FLAG_NONE,
+                        NGTCP2_STRM0_MAX_STREAM_DATA,
+                        NGTCP2_STRM0_MAX_STREAM_DATA, NULL, mem);
   if (rv != 0) {
     goto fail_strm0_init;
   }
@@ -236,8 +239,8 @@ int ngtcp2_conn_client_new(ngtcp2_conn **pconn, uint64_t conn_id,
   /* TODO Since transport parameters are not required for interop now,
      just supply sensible default here.  Remove this when transport
      parameter gets mandatory. */
-  (*pconn)->remote_settings.max_stream_data = 128 * 1024;
-  (*pconn)->remote_settings.max_data = 128;
+  (*pconn)->remote_settings.max_stream_data = 64 * 1024;
+  (*pconn)->remote_settings.max_data = 64;
   (*pconn)->remote_settings.max_stream_id = 1;
 
   return 0;
@@ -262,8 +265,8 @@ int ngtcp2_conn_server_new(ngtcp2_conn **pconn, uint64_t conn_id,
   /* TODO Since transport parameters are not required for interop now,
      just supply sensible default here.  Remove this when transport
      parameter gets mandatory. */
-  (*pconn)->remote_settings.max_stream_data = 128 * 1024;
-  (*pconn)->remote_settings.max_data = 128;
+  (*pconn)->remote_settings.max_stream_data = 64 * 1024;
+  (*pconn)->remote_settings.max_data = 64;
   (*pconn)->remote_settings.max_stream_id = 0;
 
   return 0;
@@ -420,10 +423,11 @@ static ssize_t conn_retransmit_unprotected(ngtcp2_conn *conn, uint8_t *dest,
   int rv;
   ngtcp2_upe upe;
   ngtcp2_pkt_hd hd = ent->hd;
-  ngtcp2_frame_chain **pfrc;
+  ngtcp2_frame_chain **pfrc, *frc;
   ngtcp2_rtb_entry *nent = NULL;
   ngtcp2_frame localfr;
   int pkt_empty = 1;
+  int send_pkt_cb_called = 0;
 
   /* This is required because ent->hd may have old client version. */
   hd.version = conn->version;
@@ -437,29 +441,41 @@ static ssize_t conn_retransmit_unprotected(ngtcp2_conn *conn, uint8_t *dest,
     return rv;
   }
 
-  rv = conn_call_send_pkt(conn, &hd);
-  if (rv != 0) {
-    return rv;
-  }
-
   /* TODO Don't include ACK in this unprotected packet in order not to
      ack protected packet here for now. */
 
-  for (pfrc = &ent->frc; *pfrc; pfrc = &(*pfrc)->next) {
+  for (pfrc = &ent->frc; *pfrc;) {
+    if ((*pfrc)->fr.type == NGTCP2_FRAME_MAX_STREAM_DATA) {
+      if ((*pfrc)->fr.max_stream_data.max_stream_data <
+          conn->strm0->unsent_max_rx_offset) {
+        frc = *pfrc;
+        *pfrc = (*pfrc)->next;
+        ngtcp2_frame_chain_del(frc, conn->mem);
+        continue;
+      }
+    }
     rv = ngtcp2_upe_encode_frame(&upe, &(*pfrc)->fr);
     if (rv != 0) {
       if (rv == NGTCP2_ERR_NOBUF) {
         break;
       }
     }
+
+    if (!send_pkt_cb_called) {
+      rv = conn_call_send_pkt(conn, &hd);
+      if (rv != 0) {
+        return rv;
+      }
+      send_pkt_cb_called = 1;
+    }
+
     rv = conn_call_send_frame(conn, &hd, &(*pfrc)->fr);
     if (rv != 0) {
       return rv;
     }
-  }
 
-  if (*pfrc != ent->frc) {
     pkt_empty = 0;
+    pfrc = &(*pfrc)->next;
   }
 
   if (pkt_empty) {
@@ -522,6 +538,8 @@ static ssize_t conn_retransmit_protected(ngtcp2_conn *conn, uint8_t *dest,
   int pkt_empty = 1;
   ssize_t nwrite;
   ngtcp2_crypto_ctx ctx;
+  ngtcp2_strm *strm;
+  int send_pkt_cb_called = 0;
 
   /* This is required because ent->hd may have old client version. */
   hd.version = conn->version;
@@ -540,11 +558,6 @@ static ssize_t conn_retransmit_protected(ngtcp2_conn *conn, uint8_t *dest,
     return rv;
   }
 
-  rv = conn_call_send_pkt(conn, &hd);
-  if (rv != 0) {
-    return rv;
-  }
-
   localfr.type = !NGTCP2_FRAME_ACK;
   rv = conn_create_ack_frame(conn, &localfr.ack, ts);
   if (rv != 0) {
@@ -555,6 +568,15 @@ static ssize_t conn_retransmit_protected(ngtcp2_conn *conn, uint8_t *dest,
     if (rv != 0) {
       return rv;
     }
+
+    if (!send_pkt_cb_called) {
+      rv = conn_call_send_pkt(conn, &hd);
+      if (rv != 0) {
+        return rv;
+      }
+      send_pkt_cb_called = 1;
+    }
+
     rv = conn_call_send_frame(conn, &hd, &localfr);
     if (rv != 0) {
       return rv;
@@ -564,12 +586,28 @@ static ssize_t conn_retransmit_protected(ngtcp2_conn *conn, uint8_t *dest,
   }
 
   for (pfrc = &ent->frc; *pfrc;) {
-    if ((*pfrc)->fr.type == NGTCP2_FRAME_MAX_STREAM_ID &&
-        (*pfrc)->fr.max_stream_id.max_stream_id < conn->max_remote_stream_id) {
-      frc = *pfrc;
-      pfrc = &(*pfrc)->next;
-      ngtcp2_frame_chain_del(frc, conn->mem);
-      continue;
+    switch ((*pfrc)->fr.type) {
+    case NGTCP2_FRAME_MAX_STREAM_ID:
+      if ((*pfrc)->fr.max_stream_id.max_stream_id <
+          conn->max_remote_stream_id) {
+        frc = *pfrc;
+        *pfrc = (*pfrc)->next;
+        ngtcp2_frame_chain_del(frc, conn->mem);
+        continue;
+      }
+      break;
+    case NGTCP2_FRAME_MAX_STREAM_DATA:
+      strm =
+          ngtcp2_conn_find_stream(conn, (*pfrc)->fr.max_stream_data.stream_id);
+      if (strm == NULL ||
+          (*pfrc)->fr.max_stream_data.max_stream_data <
+              strm->unsent_max_rx_offset) {
+        frc = *pfrc;
+        *pfrc = (*pfrc)->next;
+        ngtcp2_frame_chain_del(frc, conn->mem);
+        continue;
+      }
+      break;
     }
     rv = ngtcp2_ppe_encode_frame(&ppe, &(*pfrc)->fr);
     if (rv != 0) {
@@ -577,16 +615,22 @@ static ssize_t conn_retransmit_protected(ngtcp2_conn *conn, uint8_t *dest,
         break;
       }
     }
+
+    if (!send_pkt_cb_called) {
+      rv = conn_call_send_pkt(conn, &hd);
+      if (rv != 0) {
+        return rv;
+      }
+      send_pkt_cb_called = 1;
+    }
+
     rv = conn_call_send_frame(conn, &hd, &(*pfrc)->fr);
     if (rv != 0) {
       return rv;
     }
 
-    pfrc = &(*pfrc)->next;
-  }
-
-  if (*pfrc != ent->frc) {
     pkt_empty = 0;
+    pfrc = &(*pfrc)->next;
   }
 
   if (pkt_empty) {
@@ -713,12 +757,14 @@ static ssize_t conn_encode_handshake_pkt(ngtcp2_conn *conn, uint8_t *dest,
   int rv;
   ngtcp2_upe upe;
   ngtcp2_pkt_hd hd;
-  ngtcp2_frame_chain *frc = NULL;
+  ngtcp2_frame_chain *frc = NULL, **pfrc, *frc_head = NULL, *frc_next;
   ngtcp2_frame *fr, localfr;
   size_t nwrite;
   ngtcp2_rtb_entry *rtbent;
   int pkt_empty = 1;
 
+  pfrc = &frc_head;
+
   ngtcp2_pkt_hd_init(&hd, NGTCP2_PKT_FLAG_LONG_FORM, type, conn->conn_id,
                      conn->next_tx_pkt_num, conn->version);
 
@@ -740,15 +786,46 @@ static ssize_t conn_encode_handshake_pkt(ngtcp2_conn *conn, uint8_t *dest,
     /* TODO Should we retransmit ACK frame? */
     rv = conn_create_ack_frame(conn, &localfr.ack, ts);
     if (rv != 0) {
-      goto fail;
+      return rv;
     }
     if (localfr.type == NGTCP2_FRAME_ACK) {
       rv = ngtcp2_upe_encode_frame(&upe, &localfr);
       if (rv != 0) {
-        goto fail;
+        return rv;
       }
 
       rv = conn_call_send_frame(conn, &hd, &localfr);
+      if (rv != 0) {
+        return rv;
+      }
+
+      pkt_empty = 0;
+    }
+
+    if (conn->strm0->max_rx_offset < conn->strm0->unsent_max_rx_offset) {
+      rv = ngtcp2_frame_chain_new(&frc, conn->mem);
+      if (rv != 0) {
+        return rv;
+      }
+
+      *pfrc = frc;
+      pfrc = &frc->next;
+
+      frc->fr.type = NGTCP2_FRAME_MAX_STREAM_DATA;
+      frc->fr.max_stream_data.stream_id = 0;
+      frc->fr.max_stream_data.max_stream_data =
+          conn->strm0->unsent_max_rx_offset;
+
+      /* TODO If we get NGTCP2_ERR_NOBUF below, we lose
+         MAX_STREAM_DATA update. */
+      conn->strm0->max_rx_offset = conn->strm0->unsent_max_rx_offset;
+
+      rv = ngtcp2_upe_encode_frame(&upe, &frc->fr);
+      if (rv != 0) {
+        goto fail;
+      }
+
+      rv = conn_call_send_frame(conn, &hd, &frc->fr);
       if (rv != 0) {
         goto fail;
       }
@@ -769,6 +846,8 @@ static ssize_t conn_encode_handshake_pkt(ngtcp2_conn *conn, uint8_t *dest,
 
   nwrite = ngtcp2_min(ngtcp2_buf_len(tx_buf),
                       ngtcp2_upe_left(&upe) - NGTCP2_STREAM_OVERHEAD);
+  nwrite =
+      ngtcp2_min(nwrite, conn->strm0->max_tx_offset - conn->strm0->tx_offset);
 
   if (nwrite > 0) {
     rv = ngtcp2_frame_chain_new(&frc, conn->mem);
@@ -776,6 +855,9 @@ static ssize_t conn_encode_handshake_pkt(ngtcp2_conn *conn, uint8_t *dest,
       goto fail;
     }
 
+    *pfrc = frc;
+    pfrc = &frc->next;
+
     fr = &frc->fr;
 
     /* TODO Make a function to create STREAM frame */
@@ -814,9 +896,9 @@ static ssize_t conn_encode_handshake_pkt(ngtcp2_conn *conn, uint8_t *dest,
 
   ++conn->next_tx_pkt_num;
 
-  if (frc) {
-    rv = ngtcp2_rtb_entry_new(&rtbent, &hd, frc, ts + NGTCP2_INITIAL_EXPIRY,
-                              conn->mem);
+  if (frc_head) {
+    rv = ngtcp2_rtb_entry_new(&rtbent, &hd, frc_head,
+                              ts + NGTCP2_INITIAL_EXPIRY, conn->mem);
     if (rv != 0) {
       goto fail;
     }
@@ -831,8 +913,11 @@ static ssize_t conn_encode_handshake_pkt(ngtcp2_conn *conn, uint8_t *dest,
   return (ssize_t)ngtcp2_upe_final(&upe, NULL);
 
 fail:
-  ngtcp2_frame_chain_del(frc, conn->mem);
-
+  for (frc = frc_head; frc;) {
+    frc_next = frc->next;
+    ngtcp2_frame_chain_del(frc, conn->mem);
+    frc = frc_next;
+  }
   return rv;
 }
 
@@ -986,6 +1071,8 @@ static ssize_t conn_send_pkt(ngtcp2_conn *conn, uint8_t *dest, size_t destlen,
   ngtcp2_frame_chain **pfrc, *nfrc;
   ngtcp2_rtb_entry *ent;
   size_t left;
+  ngtcp2_strm *strm, *strm_next;
+  int send_pkt_cb_called = 0;
 
   ackfr.type = !NGTCP2_FRAME_ACK;
   rv = conn_create_ack_frame(conn, &ackfr.ack, ts);
@@ -1011,17 +1098,20 @@ static ssize_t conn_send_pkt(ngtcp2_conn *conn, uint8_t *dest, size_t destlen,
     return rv;
   }
 
-  rv = conn_call_send_pkt(conn, &hd);
-  if (rv != 0) {
-    return rv;
-  }
-
   if (ackfr.type == NGTCP2_FRAME_ACK) {
     rv = ngtcp2_ppe_encode_frame(&ppe, &ackfr);
     if (rv != 0) {
       return rv;
     }
 
+    if (!send_pkt_cb_called) {
+      rv = conn_call_send_pkt(conn, &hd);
+      if (rv != 0) {
+        return rv;
+      }
+      send_pkt_cb_called = 1;
+    }
+
     rv = conn_call_send_frame(conn, &hd, &ackfr);
     if (rv != 0) {
       return rv;
@@ -1043,6 +1133,30 @@ static ssize_t conn_send_pkt(ngtcp2_conn *conn, uint8_t *dest, size_t destlen,
        buffer is too small. */
   }
 
+  while (conn->fc_strms) {
+    strm = conn->fc_strms;
+    rv = ngtcp2_frame_chain_new(&nfrc, conn->mem);
+    if (rv != 0) {
+      return rv;
+    }
+    nfrc->fr.type = NGTCP2_FRAME_MAX_STREAM_DATA;
+    nfrc->fr.max_stream_data.stream_id = strm->stream_id;
+    nfrc->fr.max_stream_data.max_stream_data = strm->unsent_max_rx_offset;
+    nfrc->next = conn->frq;
+    conn->frq = nfrc;
+
+    strm->max_rx_offset = strm->unsent_max_rx_offset;
+
+    strm_next = strm->fc_next;
+    conn->fc_strms = strm_next;
+    if (strm_next) {
+      strm_next->fc_pprev = &conn->fc_strms;
+    }
+    strm->fc_next = NULL;
+    strm->fc_pprev = NULL;
+    strm = strm_next;
+  }
+
   for (pfrc = &conn->frq; *pfrc; pfrc = &(*pfrc)->next) {
     if ((*pfrc)->fr.type == NGTCP2_FRAME_STREAM) {
       left = ngtcp2_ppe_left(&ppe);
@@ -1075,6 +1189,15 @@ static ssize_t conn_send_pkt(ngtcp2_conn *conn, uint8_t *dest, size_t destlen,
       }
       return rv;
     }
+
+    if (!send_pkt_cb_called) {
+      rv = conn_call_send_pkt(conn, &hd);
+      if (rv != 0) {
+        return rv;
+      }
+      send_pkt_cb_called = 1;
+    }
+
     rv = conn_call_send_frame(conn, &hd, &(*pfrc)->fr);
     if (rv != 0) {
       return rv;
@@ -1219,6 +1342,20 @@ static int conn_recv_ack(ngtcp2_conn *conn, ngtcp2_ack *fr) {
   return ngtcp2_rtb_recv_ack(&conn->rtb, fr, conn);
 }
 
+static int conn_recv_max_stream_data(ngtcp2_conn *conn,
+                                     const ngtcp2_max_stream_data *fr) {
+  ngtcp2_strm *strm;
+
+  strm = ngtcp2_conn_find_stream(conn, fr->stream_id);
+  if (strm == NULL) {
+    return 0;
+  }
+
+  strm->max_tx_offset = ngtcp2_max(strm->max_tx_offset, fr->max_stream_data);
+
+  return 0;
+}
+
 static int conn_buffer_protected_pkt(ngtcp2_conn *conn, const uint8_t *pkt,
                                      size_t pktlen, ngtcp2_tstamp ts) {
   int rv;
@@ -1251,6 +1388,7 @@ static int conn_recv_handshake_pkt(ngtcp2_conn *conn, const uint8_t *pkt,
   int rv;
   int require_ack = 0;
   uint64_t rx_offset;
+  uint64_t fr_end_offset;
 
   if (!(pkt[0] & NGTCP2_HEADER_FORM_BIT)) {
     return conn_buffer_protected_pkt(conn, pkt, pktlen, ts);
@@ -1321,12 +1459,19 @@ static int conn_recv_handshake_pkt(ngtcp2_conn *conn, const uint8_t *pkt,
     require_ack |=
         fr.type != NGTCP2_FRAME_ACK && fr.type != NGTCP2_FRAME_CONNECTION_CLOSE;
 
-    if (fr.type == NGTCP2_FRAME_ACK) {
+    switch (fr.type) {
+    case NGTCP2_FRAME_ACK:
       rv = conn_recv_ack(conn, &fr.ack);
       if (rv != 0) {
         return rv;
       }
       continue;
+    case NGTCP2_FRAME_MAX_STREAM_DATA:
+      rv = conn_recv_max_stream_data(conn, &fr.max_stream_data);
+      if (rv != 0) {
+        return rv;
+      }
+      continue;
     }
 
     if (fr.type != NGTCP2_FRAME_STREAM || fr.stream.stream_id != 0 ||
@@ -1343,10 +1488,9 @@ static int conn_recv_handshake_pkt(ngtcp2_conn *conn, const uint8_t *pkt,
       continue;
     }
 
-    /* TODO Refused to receive stream data which is more than 128KiB
-       for now.  We can ditch this if flow control is implemented. */
-    if (fr.stream.offset > 128 * 1024) {
-      return NGTCP2_ERR_INTERNAL;
+    fr_end_offset = fr.stream.offset + fr.stream.datalen;
+    if (conn->strm0->max_rx_offset < fr_end_offset) {
+      return NGTCP2_ERR_FLOW_CONTROL;
     }
 
     if (fr.stream.offset <= rx_offset) {
@@ -1363,6 +1507,8 @@ static int conn_recv_handshake_pkt(ngtcp2_conn *conn, const uint8_t *pkt,
         return rv;
       }
 
+      conn->strm0->unsent_max_rx_offset += datalen;
+
       rv =
           ngtcp2_conn_emit_pending_recv_handshake(conn, conn->strm0, rx_offset);
       if (rv != 0) {
@@ -1416,7 +1562,9 @@ int ngtcp2_conn_init_stream(ngtcp2_conn *conn, ngtcp2_strm *strm,
   int rv;
 
   rv = ngtcp2_strm_init(strm, stream_id, NGTCP2_STRM_FLAG_NONE,
-                        stream_user_data, conn->mem);
+                        conn->local_settings.max_stream_data,
+                        conn->remote_settings.max_stream_data, stream_user_data,
+                        conn->mem);
   if (rv != 0) {
     ngtcp2_mem_free(conn->mem, strm);
     return rv;
@@ -1478,8 +1626,7 @@ static int conn_recv_stream(ngtcp2_conn *conn, const ngtcp2_stream *fr) {
     return NGTCP2_ERR_PROTO;
   }
 
-  if (UINT64_MAX - fr->datalen < fr->offset ||
-      conn->local_settings.max_stream_data < fr->offset + fr->datalen) {
+  if (UINT64_MAX - fr->datalen < fr->offset) {
     return NGTCP2_ERR_PROTO;
   }
 
@@ -1513,6 +1660,11 @@ static int conn_recv_stream(ngtcp2_conn *conn, const ngtcp2_stream *fr) {
   }
 
   fr_end_offset = fr->offset + fr->datalen;
+
+  if (strm->max_rx_offset < fr_end_offset) {
+    return NGTCP2_ERR_FLOW_CONTROL;
+  }
+
   strm->last_rx_offset = ngtcp2_max(strm->last_rx_offset, fr_end_offset);
 
   if (fr->fin) {
@@ -1668,6 +1820,12 @@ static int conn_recv_pkt(ngtcp2_conn *conn, uint8_t *pkt, size_t pktlen,
         return rv;
       }
       break;
+    case NGTCP2_FRAME_MAX_STREAM_DATA:
+      rv = conn_recv_max_stream_data(conn, &fr.max_stream_data);
+      if (rv != 0) {
+        return rv;
+      }
+      break;
     }
   }
 
@@ -1801,6 +1959,8 @@ int ngtcp2_conn_emit_pending_recv_handshake(ngtcp2_conn *conn,
       return rv;
     }
 
+    strm->unsent_max_rx_offset += datalen;
+
     ngtcp2_rob_pop(&strm->rob, rx_offset - datalen, datalen);
   }
 }
@@ -1819,8 +1979,11 @@ int ngtcp2_conn_sched_ack(ngtcp2_conn *conn, uint64_t pkt_num,
     return rv;
   }
 
-  /* TODO Ignore error for now */
-  ngtcp2_acktr_add(&conn->acktr, rpkt);
+  rv = ngtcp2_acktr_add(&conn->acktr, rpkt);
+  if (rv != 0) {
+    ngtcp2_acktr_entry_del(rpkt, conn->mem);
+    return rv;
+  }
 
   return 0;
 }
@@ -2074,11 +2237,6 @@ ssize_t ngtcp2_conn_write_stream(ngtcp2_conn *conn, uint8_t *dest,
     return rv;
   }
 
-  rv = conn_call_send_pkt(conn, &hd);
-  if (rv != 0) {
-    return rv;
-  }
-
   left = ngtcp2_ppe_left(&ppe);
   if (left <= NGTCP2_STREAM_OVERHEAD) {
     return NGTCP2_ERR_NOBUF;
@@ -2088,6 +2246,16 @@ ssize_t ngtcp2_conn_write_stream(ngtcp2_conn *conn, uint8_t *dest,
 
   /* TODO Take into account flow control credit here */
   ndatalen = ngtcp2_min(datalen, left);
+  ndatalen = ngtcp2_min(ndatalen, strm->max_tx_offset - strm->tx_offset);
+
+  if (datalen > 0 && ndatalen == 0) {
+    return NGTCP2_ERR_STREAM_DATA_BLOCKED;
+  }
+
+  rv = conn_call_send_pkt(conn, &hd);
+  if (rv != 0) {
+    return rv;
+  }
 
   rv = ngtcp2_frame_chain_new(&frc, conn->mem);
   if (rv != 0) {
@@ -2123,6 +2291,7 @@ ssize_t ngtcp2_conn_write_stream(ngtcp2_conn *conn, uint8_t *dest,
   rv = ngtcp2_rtb_entry_new(&ent, &hd, frc, ts + NGTCP2_INITIAL_EXPIRY,
                             conn->mem);
   if (rv != 0) {
+    ngtcp2_frame_chain_del(frc, conn->mem);
     return rv;
   }
 
@@ -2163,6 +2332,13 @@ int ngtcp2_conn_close_stream(ngtcp2_conn *conn, ngtcp2_strm *strm) {
     conn->max_remote_stream_id += 2;
   }
 
+  if (strm->fc_pprev) {
+    *strm->fc_pprev = strm->fc_next;
+    if (strm->fc_next) {
+      strm->fc_next->fc_pprev = strm->fc_pprev;
+    }
+  }
+
   ngtcp2_strm_free(strm);
   ngtcp2_mem_free(conn->mem, strm);
 
@@ -2177,3 +2353,32 @@ int ngtcp2_conn_close_stream_if_shut_rdwr(ngtcp2_conn *conn,
   }
   return 0;
 }
+
+int ngtcp2_conn_extend_max_stream_offset(ngtcp2_conn *conn, uint32_t stream_id,
+                                         size_t datalen) {
+  ngtcp2_strm *strm;
+
+  if (stream_id == 0) {
+    return NGTCP2_ERR_INVALID_ARGUMENT;
+  }
+
+  strm = ngtcp2_conn_find_stream(conn, stream_id);
+  if (strm == NULL) {
+    return NGTCP2_ERR_INVALID_ARGUMENT;
+  }
+
+  if (strm->unsent_max_rx_offset <= UINT64_MAX - datalen) {
+    strm->unsent_max_rx_offset += datalen;
+  }
+
+  if (!strm->fc_pprev) {
+    strm->fc_pprev = &conn->fc_strms;
+    if (conn->fc_strms) {
+      strm->fc_next = conn->fc_strms;
+      conn->fc_strms->fc_pprev = &strm->fc_next;
+    }
+    conn->fc_strms = strm;
+  }
+
+  return 0;
+}
diff --git a/lib/ngtcp2_conn.h b/lib/ngtcp2_conn.h
index ff7f4369..ad7e7cd4 100644
--- a/lib/ngtcp2_conn.h
+++ b/lib/ngtcp2_conn.h
@@ -60,6 +60,10 @@ typedef enum {
    packets buffered which arrive before handshake completes. */
 #define NGTCP2_MAX_NUM_BUFFED_RX_PPKTS 16
 
+/* NGTCP2_STRM0_MAX_STREAM_DATA is the maximum stream offset that an
+   endpoint can send initially. */
+#define NGTCP2_STRM0_MAX_STREAM_DATA 65535
+
 struct ngtcp2_pkt_chain;
 typedef struct ngtcp2_pkt_chain ngtcp2_pkt_chain;
 
@@ -98,6 +102,7 @@ struct ngtcp2_conn {
   ngtcp2_conn_callbacks callbacks;
   ngtcp2_strm *strm0;
   ngtcp2_map strms;
+  ngtcp2_strm *fc_strms;
   ngtcp2_idtr local_idtr;
   ngtcp2_idtr remote_idtr;
   uint64_t conn_id;
diff --git a/lib/ngtcp2_err.c b/lib/ngtcp2_err.c
index 22a7cb8e..a879a439 100644
--- a/lib/ngtcp2_err.c
+++ b/lib/ngtcp2_err.c
@@ -44,6 +44,10 @@ const char *ngtcp2_strerror(int liberr) {
     return "ERR_STREAM_ID_BLOCKED";
   case NGTCP2_ERR_STREAM_IN_USE:
     return "ERR_STREAM_IN_USE";
+  case NGTCP2_ERR_STREAM_DATA_BLOCKED:
+    return "ERR_STREAM_DATA_BLOCKED";
+  case NGTCP2_ERR_FLOW_CONTROL:
+    return "ERR_FLOW_CONTROL";
   case NGTCP2_ERR_NOMEM:
     return "ERR_NOMEM";
   case NGTCP2_ERR_CALLBACK_FAILURE:
diff --git a/lib/ngtcp2_strm.c b/lib/ngtcp2_strm.c
index e70fbd5a..832fd072 100644
--- a/lib/ngtcp2_strm.c
+++ b/lib/ngtcp2_strm.c
@@ -27,6 +27,7 @@
 #include <string.h>
 
 int ngtcp2_strm_init(ngtcp2_strm *strm, uint32_t stream_id, uint32_t flags,
+                     uint64_t max_rx_offset, uint64_t max_tx_offset,
                      void *stream_user_data, ngtcp2_mem *mem) {
   int rv;
 
@@ -36,9 +37,13 @@ int ngtcp2_strm_init(ngtcp2_strm *strm, uint32_t stream_id, uint32_t flags,
   strm->stream_id = stream_id;
   strm->flags = flags;
   strm->stream_user_data = stream_user_data;
+  strm->max_rx_offset = strm->unsent_max_rx_offset = max_rx_offset;
+  strm->max_tx_offset = max_tx_offset;
   strm->me.key = stream_id;
   strm->me.next = NULL;
   strm->mem = mem;
+  strm->fc_pprev = NULL;
+  strm->fc_next = NULL;
   memset(&strm->tx_buf, 0, sizeof(strm->tx_buf));
 
   rv = ngtcp2_gaptr_init(&strm->acked_tx_offset, mem);
diff --git a/lib/ngtcp2_strm.h b/lib/ngtcp2_strm.h
index 2d33ea1a..c681c916 100644
--- a/lib/ngtcp2_strm.h
+++ b/lib/ngtcp2_strm.h
@@ -48,14 +48,28 @@ typedef enum {
       NGTCP2_STRM_FLAG_SHUT_RD | NGTCP2_STRM_FLAG_SHUT_WR,
 } ngtcp2_strm_flags;
 
-typedef struct {
+struct ngtcp2_strm;
+
+typedef struct ngtcp2_strm ngtcp2_strm;
+
+struct ngtcp2_strm {
   ngtcp2_map_entry me;
   uint64_t tx_offset;
   ngtcp2_gaptr acked_tx_offset;
+  /* max_tx_offset is the maximum offset that local endpoint can send
+     for this stream. */
+  uint64_t max_tx_offset;
   /* last_rx_offset is the largest offset of stream data received for
      this stream. */
   uint64_t last_rx_offset;
   ngtcp2_rob rob;
+  /* max_rx_offset is the maximum offset that remote endpoint can send
+     to this stream. */
+  uint64_t max_rx_offset;
+  /* unsent_max_rx_offset is the maximum offset that remote endpoint
+     can send to this stream, and it is not notified to the remote
+     endpoint.  unsent_max_rx_offset >= max_rx_offset must be hold. */
+  uint64_t unsent_max_rx_offset;
   ngtcp2_mem *mem;
   size_t nbuffered;
   ngtcp2_buf tx_buf;
@@ -63,9 +77,11 @@ typedef struct {
   void *stream_user_data;
   /* flags is bit-wise OR of zero or more of ngtcp2_strm_flags. */
   uint32_t flags;
-} ngtcp2_strm;
+  ngtcp2_strm **fc_pprev, *fc_next;
+};
 
 int ngtcp2_strm_init(ngtcp2_strm *strm, uint32_t stream_id, uint32_t flags,
+                     uint64_t max_rx_offset, uint64_t max_tx_offset,
                      void *stream_user_data, ngtcp2_mem *mem);
 
 void ngtcp2_strm_free(ngtcp2_strm *strm);
diff --git a/tests/main.c b/tests/main.c
index eb214ff5..767e772a 100644
--- a/tests/main.c
+++ b/tests/main.c
@@ -116,7 +116,13 @@ int main() {
       !CU_add_test(pSuite, "rtb_recv_ack", test_ngtcp2_rtb_recv_ack) ||
       !CU_add_test(pSuite, "idtr_open", test_ngtcp2_idtr_open) ||
       !CU_add_test(pSuite, "conn_stream_open_close",
-                   test_ngtcp2_conn_stream_open_close)) {
+                   test_ngtcp2_conn_stream_open_close) ||
+      !CU_add_test(pSuite, "conn_stream_rx_flow_control",
+                   test_ngtcp2_conn_stream_rx_flow_control) ||
+      !CU_add_test(pSuite, "conn_stream_rx_flow_control_error",
+                   test_ngtcp2_conn_stream_rx_flow_control_error) ||
+      !CU_add_test(pSuite, "conn_stream_tx_flow_control",
+                   test_ngtcp2_conn_stream_tx_flow_control)) {
     CU_cleanup_registry();
     return CU_get_error();
   }
diff --git a/tests/ngtcp2_acktr_test.c b/tests/ngtcp2_acktr_test.c
index 03f88e47..199ae3b1 100644
--- a/tests/ngtcp2_acktr_test.c
+++ b/tests/ngtcp2_acktr_test.c
@@ -76,7 +76,7 @@ void test_ngtcp2_acktr_add(void) {
 
   rv = ngtcp2_acktr_add(&acktr, &ents[0]);
 
-  CU_ASSERT(NGTCP2_ERR_INVALID_ARGUMENT == rv);
+  CU_ASSERT(NGTCP2_ERR_PROTO == rv);
 
   ngtcp2_acktr_free(&acktr);
 }
diff --git a/tests/ngtcp2_conn_test.c b/tests/ngtcp2_conn_test.c
index 03c714cd..f41b3a3f 100644
--- a/tests/ngtcp2_conn_test.c
+++ b/tests/ngtcp2_conn_test.c
@@ -69,9 +69,18 @@ static ssize_t null_decrypt(ngtcp2_conn *conn, uint8_t *dest, size_t destlen,
 }
 
 static void server_default_settings(ngtcp2_settings *settings) {
-  settings->max_stream_data = 65536;
+  settings->max_stream_data = 65535;
   settings->max_data = 128;
-  settings->max_stream_id = 3;
+  settings->max_stream_id = 5;
+  settings->idle_timeout = 60;
+  settings->omit_connection_id = 0;
+  settings->max_packet_size = 65535;
+}
+
+static void client_default_settings(ngtcp2_settings *settings) {
+  settings->max_stream_data = 65535;
+  settings->max_data = 128;
+  settings->max_stream_id = 0;
   settings->idle_timeout = 60;
   settings->omit_connection_id = 0;
   settings->max_packet_size = 65535;
@@ -81,29 +90,52 @@ static uint8_t null_key[16];
 static uint8_t null_iv[16];
 static uint8_t null_data[4096];
 
-void test_ngtcp2_conn_stream_open_close(void) {
-  ngtcp2_conn *conn;
+static void setup_default_server(ngtcp2_conn **pconn) {
   ngtcp2_conn_callbacks cb;
   ngtcp2_settings settings;
-  uint8_t buf[2048];
-  size_t pktlen;
-  ssize_t spktlen;
-  int rv;
-  ngtcp2_frame fr;
-  ngtcp2_strm *strm;
 
   memset(&cb, 0, sizeof(cb));
   cb.decrypt = null_decrypt;
   cb.encrypt = null_encrypt;
   server_default_settings(&settings);
 
-  ngtcp2_conn_server_new(&conn, 0x1, NGTCP2_PROTO_VERSION, &cb, &settings,
+  ngtcp2_conn_server_new(pconn, 0x1, NGTCP2_PROTO_VERSION, &cb, &settings,
+                         NULL);
+  ngtcp2_conn_update_tx_keys(*pconn, null_key, sizeof(null_key), null_iv,
+                             sizeof(null_iv));
+  ngtcp2_conn_update_rx_keys(*pconn, null_key, sizeof(null_key), null_iv,
+                             sizeof(null_iv));
+  (*pconn)->state = NGTCP2_CS_POST_HANDSHAKE;
+}
+
+static void setup_default_client(ngtcp2_conn **pconn) {
+  ngtcp2_conn_callbacks cb;
+  ngtcp2_settings settings;
+
+  memset(&cb, 0, sizeof(cb));
+  cb.decrypt = null_decrypt;
+  cb.encrypt = null_encrypt;
+  client_default_settings(&settings);
+
+  ngtcp2_conn_client_new(pconn, 0x1, NGTCP2_PROTO_VERSION, &cb, &settings,
                          NULL);
-  ngtcp2_conn_update_tx_keys(conn, null_key, sizeof(null_key), null_iv,
+  ngtcp2_conn_update_tx_keys(*pconn, null_key, sizeof(null_key), null_iv,
                              sizeof(null_iv));
-  ngtcp2_conn_update_rx_keys(conn, null_key, sizeof(null_key), null_iv,
+  ngtcp2_conn_update_rx_keys(*pconn, null_key, sizeof(null_key), null_iv,
                              sizeof(null_iv));
-  conn->state = NGTCP2_CS_POST_HANDSHAKE;
+  (*pconn)->state = NGTCP2_CS_POST_HANDSHAKE;
+}
+
+void test_ngtcp2_conn_stream_open_close(void) {
+  ngtcp2_conn *conn;
+  uint8_t buf[2048];
+  size_t pktlen;
+  ssize_t spktlen;
+  int rv;
+  ngtcp2_frame fr;
+  ngtcp2_strm *strm;
+
+  setup_default_server(&conn);
 
   fr.type = NGTCP2_FRAME_STREAM;
   fr.stream.flags = 0;
@@ -113,7 +145,7 @@ void test_ngtcp2_conn_stream_open_close(void) {
   fr.stream.datalen = 17;
   fr.stream.data = null_data;
 
-  pktlen = write_stream_pkt(conn, buf, sizeof(buf), 0xc, 1, &fr);
+  pktlen = write_single_frame_pkt(conn, buf, sizeof(buf), 0xc, 1, &fr);
 
   rv = ngtcp2_conn_recv(conn, buf, pktlen, 1);
 
@@ -127,7 +159,7 @@ void test_ngtcp2_conn_stream_open_close(void) {
   fr.stream.offset = 17;
   fr.stream.datalen = 0;
 
-  pktlen = write_stream_pkt(conn, buf, sizeof(buf), 0xc, 2, &fr);
+  pktlen = write_single_frame_pkt(conn, buf, sizeof(buf), 0xc, 2, &fr);
 
   rv = ngtcp2_conn_recv(conn, buf, pktlen, 2);
 
@@ -147,3 +179,161 @@ void test_ngtcp2_conn_stream_open_close(void) {
 
   ngtcp2_conn_del(conn);
 }
+
+void test_ngtcp2_conn_stream_rx_flow_control(void) {
+  ngtcp2_conn *conn;
+  uint8_t buf[2048];
+  size_t pktlen;
+  ssize_t spktlen;
+  int rv;
+  ngtcp2_frame fr;
+  ngtcp2_strm *strm;
+  size_t i;
+
+  setup_default_server(&conn);
+
+  conn->local_settings.max_stream_data = 2047;
+  conn->local_settings.max_stream_id = 5;
+
+  for (i = 0; i < 3; ++i) {
+    uint32_t stream_id = (uint32_t)(i * 2 + 1);
+    fr.type = NGTCP2_FRAME_STREAM;
+    fr.stream.flags = 0;
+    fr.stream.stream_id = stream_id;
+    fr.stream.fin = 0;
+    fr.stream.offset = 0;
+    fr.stream.datalen = 1024;
+    fr.stream.data = null_data;
+
+    pktlen = write_single_frame_pkt(conn, buf, sizeof(buf), 0xc, i, &fr);
+    rv = ngtcp2_conn_recv(conn, buf, pktlen, 1);
+
+    CU_ASSERT(0 == rv);
+
+    strm = ngtcp2_conn_find_stream(conn, stream_id);
+
+    CU_ASSERT(NULL != strm);
+
+    rv = ngtcp2_conn_extend_max_stream_offset(conn, stream_id,
+                                              fr.stream.datalen);
+
+    CU_ASSERT(0 == rv);
+  }
+
+  strm = conn->fc_strms;
+
+  CU_ASSERT(5 == strm->stream_id);
+
+  strm = strm->fc_next;
+
+  CU_ASSERT(3 == strm->stream_id);
+
+  strm = strm->fc_next;
+
+  CU_ASSERT(1 == strm->stream_id);
+
+  strm = strm->fc_next;
+
+  CU_ASSERT(NULL == strm);
+
+  spktlen = ngtcp2_conn_send(conn, buf, sizeof(buf), 2);
+
+  CU_ASSERT(spktlen > 0);
+  CU_ASSERT(NULL == conn->fc_strms);
+
+  for (i = 0; i < 3; ++i) {
+    uint32_t stream_id = (uint32_t)(i * 2 + 1);
+    strm = ngtcp2_conn_find_stream(conn, stream_id);
+
+    CU_ASSERT(2047 + 1024 == strm->max_rx_offset);
+  }
+
+  ngtcp2_conn_del(conn);
+}
+
+void test_ngtcp2_conn_stream_rx_flow_control_error(void) {
+  ngtcp2_conn *conn;
+  uint8_t buf[2048];
+  size_t pktlen;
+  int rv;
+  ngtcp2_frame fr;
+
+  setup_default_server(&conn);
+
+  conn->local_settings.max_stream_data = 1023;
+
+  fr.type = NGTCP2_FRAME_STREAM;
+  fr.stream.flags = 0;
+  fr.stream.stream_id = 1;
+  fr.stream.fin = 0;
+  fr.stream.offset = 0;
+  fr.stream.datalen = 1024;
+  fr.stream.data = null_data;
+
+  pktlen = write_single_frame_pkt(conn, buf, sizeof(buf), 0xc, 1, &fr);
+  rv = ngtcp2_conn_recv(conn, buf, pktlen, 1);
+
+  CU_ASSERT(NGTCP2_ERR_FLOW_CONTROL == rv);
+
+  ngtcp2_conn_del(conn);
+}
+
+void test_ngtcp2_conn_stream_tx_flow_control(void) {
+  ngtcp2_conn *conn;
+  uint8_t buf[2048];
+  size_t pktlen;
+  ssize_t spktlen;
+  int rv;
+  ngtcp2_frame fr;
+  ngtcp2_strm *strm;
+  size_t nwrite;
+
+  setup_default_client(&conn);
+
+  conn->remote_settings.max_stream_data = 2047;
+  conn->remote_settings.max_stream_id = 5;
+
+  rv = ngtcp2_conn_open_stream(conn, 1, NULL);
+
+  CU_ASSERT(0 == rv);
+
+  strm = ngtcp2_conn_find_stream(conn, 1);
+  spktlen = ngtcp2_conn_write_stream(conn, buf, sizeof(buf), &nwrite, 1, 0,
+                                     null_data, 1024, 1);
+
+  CU_ASSERT(spktlen > 0);
+  CU_ASSERT(1024 == nwrite);
+  CU_ASSERT(1024 == strm->tx_offset);
+
+  spktlen = ngtcp2_conn_write_stream(conn, buf, sizeof(buf), &nwrite, 1, 0,
+                                     null_data, 1024, 2);
+
+  CU_ASSERT(spktlen > 0);
+  CU_ASSERT(1023 == nwrite);
+  CU_ASSERT(2047 == strm->tx_offset);
+
+  spktlen = ngtcp2_conn_write_stream(conn, buf, sizeof(buf), &nwrite, 1, 0,
+                                     null_data, 1024, 3);
+
+  CU_ASSERT(NGTCP2_ERR_STREAM_DATA_BLOCKED == spktlen);
+
+  fr.type = NGTCP2_FRAME_MAX_STREAM_DATA;
+  fr.max_stream_data.stream_id = 1;
+  fr.max_stream_data.max_stream_data = 2048;
+
+  pktlen = write_single_frame_pkt(conn, buf, sizeof(buf), 0xc, 1, &fr);
+
+  rv = ngtcp2_conn_recv(conn, buf, pktlen, 4);
+
+  CU_ASSERT(0 == rv);
+  CU_ASSERT(2048 == strm->max_tx_offset);
+
+  spktlen = ngtcp2_conn_write_stream(conn, buf, sizeof(buf), &nwrite, 1, 0,
+                                     null_data, 1024, 5);
+
+  CU_ASSERT(spktlen > 0);
+  CU_ASSERT(1 == nwrite);
+  CU_ASSERT(2048 == strm->tx_offset);
+
+  ngtcp2_conn_del(conn);
+}
diff --git a/tests/ngtcp2_conn_test.h b/tests/ngtcp2_conn_test.h
index c196843c..7c09d04d 100644
--- a/tests/ngtcp2_conn_test.h
+++ b/tests/ngtcp2_conn_test.h
@@ -30,5 +30,8 @@
 #endif /* HAVE_CONFIG_H */
 
 void test_ngtcp2_conn_stream_open_close(void);
+void test_ngtcp2_conn_stream_rx_flow_control(void);
+void test_ngtcp2_conn_stream_rx_flow_control_error(void);
+void test_ngtcp2_conn_stream_tx_flow_control(void);
 
 #endif /* NGTCP2_CONN_TEST_H */
diff --git a/tests/ngtcp2_test_helper.c b/tests/ngtcp2_test_helper.c
index 05ebed2b..180cc4ea 100644
--- a/tests/ngtcp2_test_helper.c
+++ b/tests/ngtcp2_test_helper.c
@@ -120,9 +120,9 @@ static ssize_t null_encrypt(ngtcp2_conn *conn, uint8_t *dest, size_t destlen,
   return (ssize_t)plaintextlen;
 }
 
-size_t write_stream_pkt(ngtcp2_conn *conn, uint8_t *out, size_t outlen,
-                        uint64_t conn_id, uint64_t pkt_num,
-                        const ngtcp2_frame *fr) {
+size_t write_single_frame_pkt(ngtcp2_conn *conn, uint8_t *out, size_t outlen,
+                              uint64_t conn_id, uint64_t pkt_num,
+                              const ngtcp2_frame *fr) {
   ngtcp2_crypto_ctx ctx;
   ngtcp2_ppe ppe;
   ngtcp2_mem *mem = ngtcp2_mem_default();
diff --git a/tests/ngtcp2_test_helper.h b/tests/ngtcp2_test_helper.h
index e7449d77..ca92ada7 100644
--- a/tests/ngtcp2_test_helper.h
+++ b/tests/ngtcp2_test_helper.h
@@ -70,12 +70,12 @@ size_t ngtcp2_t_encode_ack_frame(uint8_t *out, uint64_t largest_ack,
                                  uint64_t ack_blklen);
 
 /*
- * write_stream_pkt writes a QUIC packet containing single frame |fr|
- * in |out| whose capacity is |outlen|.  This function returns the
- * number of bytes written.
+ * write_single_frame_pkt writes a QUIC packet containing single frame
+ * |fr| in |out| whose capacity is |outlen|.  This function returns
+ * the number of bytes written.
  */
-size_t write_stream_pkt(ngtcp2_conn *conn, uint8_t *out, size_t outlen,
-                        uint64_t conn_id, uint64_t pkt_num,
-                        const ngtcp2_frame *fr);
+size_t write_single_frame_pkt(ngtcp2_conn *conn, uint8_t *out, size_t outlen,
+                              uint64_t conn_id, uint64_t pkt_num,
+                              const ngtcp2_frame *fr);
 
 #endif /* NGTCP2_TEST_HELPER_H */
-- 
GitLab