Merge pull request #1173 from rust-lang/ch20-edits

Ch20 edits
This commit is contained in:
Steve Klabnik 2018-02-21 11:36:26 -05:00 committed by GitHub
commit 4bded3e1fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 2494 additions and 2437 deletions

View File

@ -228,6 +228,7 @@ lifecycle
LimitTracker
lobally
locators
LockResult
login
lookup
loopback

File diff suppressed because it is too large Load Diff

View File

@ -118,11 +118,8 @@
- [Final Project: Building a Multithreaded Web Server](ch20-00-final-project-a-web-server.md)
- [A Single Threaded Web Server](ch20-01-single-threaded.md)
- [How Slow Requests Affect Throughput](ch20-02-slow-requests.md)
- [Designing the Thread Pool Interface](ch20-03-designing-the-interface.md)
- [Creating the Thread Pool and Storing Threads](ch20-04-storing-threads.md)
- [Sending Requests to Threads Via Channels](ch20-05-sending-requests-via-channels.md)
- [Graceful Shutdown and Cleanup](ch20-06-graceful-shutdown-and-cleanup.md)
- [Turning our Single Threaded Server into a Multithreaded Server](ch20-02-multithreaded.md)
- [Graceful Shutdown and Cleanup](ch20-03-graceful-shutdown-and-cleanup.md)
- [Appendix](appendix-00.md)
- [A - Keywords](appendix-01-keywords.md)

View File

@ -1,31 +1,34 @@
# Final Project: Building a Multithreaded Web Server
Its been a long journey, but here we are! Its the end of the book. Parting is
such sweet sorrow. But before we go, lets build one more project together, to
show off some of the things we learned in these final chapters, as well as
re-cap some of the earlier ones.
Its been a long journey, but here we are! The end of the book. Parting is such
sweet sorrow. But before we go, lets build one more project together, to show
off some of the concepts we covered in these final chapters, as well as recap
some lessons from earlier.
Heres what were going to make: a web server that says hello:
For our final project were going to make a web server that only says “hello”;
which will look like Figure 20-1 in a web browser:
![hello from rust](img/trpl20-01.png)
To do this, we will:
<span class="caption">Figure 20-1: Our final shared project together</span>
Heres the plan of how well build the web server:
1. Learn a little bit about TCP and HTTP
2. Listen for TCP connections on a socket
3. Parse a tiny number of HTTP requests
3. Parse a small number of HTTP requests
4. Create a proper HTTP response
5. Improve the throughput of our server with a thread pool
Before we get started, however, theres one thing we should mention: if you
were writing this code in production, there are a lot of better ways to write
it. Specifically, there are a number of robust crates on crates.io that provide
much more complete web server and thread pool implementations than we are going
to build.
Before we get started, however, theres one thing we should mention: the method
we use here will not be the best way to build a web server with Rust. There are
a number of production-ready crates available on *https://crates.io* that
provide much more complete web server and thread pool implementations than we
are going to build.
However, for this chapter, our intention is to learn, not to take the easy
route. Since Rust is a systems programming language, were able to choose what
level of abstraction we want to work with. Were able to go to a lower level
than is possible or practical in other languages if we so choose. So well be
writing a basic HTTP server and thread pool ourselves in order to learn the
general ideas and techniques behind the crates we might use in the future.
However, for this chapter, our intention is to help you learn, not to take the
easy route. Because Rust is a systems programming language, were able to
choose what level of abstraction we want to work with, and can go to a lower
level than is possible or practical in other languages. Well therefore write
the basic HTTP server and thread pool ourselves so you can learn the general
ideas and techniques behind the crates you might use in the future.

View File

@ -1,29 +1,29 @@
## A Single Threaded Web Server
## Building a Single Threaded Web Server
First, lets get a single threaded web server working. Were going to work with
the raw bytes of TCP and HTTP requests and responses to send HTML from our
server to a web browser. Lets start with a quick overview of the protocols
involved.
First well get a single threaded web server working, but before we begin,
lets look at a quick overview of the protocols involved in building web
servers. The details of these protocols are beyond the scope of this book, but
a short overview will give you the information you need.
The *Hypertext Transfer Protocol* (*HTTP*) that powers the web is built on top
of the *Transmission Control Protocol* (*TCP*). We wont get into the details
too much, but heres a short overview: TCP is a low-level protocol, and HTTP
builds a higher-level protocol on top of TCP. Both protocols are whats called a
*request-response protocol*, that is, there is a *client* that initiates
requests, and a *server* that listens to requests and provides a response to
the client. The contents of those requests and responses are defined by the
The two main protocols involved in web servers are the *Hypertext Transfer
Protocol* (*HTTP*) and the *Transmission Control Protocol* (*TCP*). Both
protocols are *request-response* protocols, meaning a *client* initiates
requests, and a *server* listens to the requests and provides a response to the
client. The contents of those requests and responses are defined by the
protocols themselves.
TCP describes the low-level details of how information gets from one server to
another, but doesnt specify what that information is; its just a bunch of
ones and zeroes. HTTP builds on top of TCP by defining what the content of the
requests and responses should be. As such, its technically possible to use
HTTP with other protocols, but in the vast majority of cases, HTTP sends its
data over TCP.
TCP is the lower-level protocol that describes the details of how information
gets from one server to another, but doesnt specify what that information is.
HTTP builds on top of TCP by defining the content of the requests and
responses. Its technically possible to use HTTP with other protocols, but in
the vast majority of cases, HTTP sends its data over TCP. Were going to work
with the raw bytes of TCP and HTTP requests and responses.
So the first thing we need to build for our web server is to be able to listen
to a TCP connection. The standard library has a `std::net` module that lets us
do this. Lets make a new project:
### Listening to the TCP Connection
Our web server needs to be able to listen to a TCP connection, so thats the
first part well work on. The standard library offers a `std::net` module that
lets us do this. Lets make a new project in the usual fashion:
```text
$ cargo new hello --bin
@ -31,8 +31,8 @@ $ cargo new hello --bin
$ cd hello
```
And put the code in Listing 20-1 in `src/main.rs` to start. This code will
listen at the address `127.0.0.1:8080` for incoming TCP streams. When it gets
Now enter the code in Listing 20-1 in `src/main.rs` to start. This code will
listen at the address `127.0.0.1:7878` for incoming TCP streams. When it gets
an incoming stream, it will print `Connection established!`:
<span class="filename">Filename: src/main.rs</span>
@ -41,7 +41,7 @@ an incoming stream, it will print `Connection established!`:
use std::net::TcpListener;
fn main() {
let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
@ -54,50 +54,64 @@ fn main() {
<span class="caption">Listing 20-1: Listening for incoming streams and printing
a message when we receive a stream</span>
A `TcpListener` allows us to listen for TCP connections. Weve chosen to listen
to the address `127.0.0.1:8080`. The part before the colon is an IP address
representing our own computer, and `8080` is the port. Weve chosen this port
because HTTP is normally accepted on port 80, but connecting to port 80 requires
administrator privileges. Regular users can listen on ports higher than 1024;
8080 is easy to remember since its the HTTP port 80 repeated.
The `TcpListener` allows us to listen for TCP connections. Weve chosen to
listen to the address `127.0.0.1:7878`. Breaking this address down, the section
before the colon is an IP address representing your own computer (this is the
same on each computer, and doesnt represent the authors computer
specifically), and `7878` is the port. Weve chosen this port for two reasons:
HTTP is normally accepted on this port and 7878 is "rust" typed on a telephone.
Note that connecting to port 80 requires administrator privileges;
non-administrators can only listen on ports higher than 1024.
The `bind` function is sort of like `new` in that it returns a new
`TcpListener` instance, but `bind` is a more descriptive name that fits with
the domain terminology. In networking, people will often talk about “binding to
a port”, so the function that the standard library defined to create a new
`TcpListener` is called `bind`.
The `bind` function in this scenario works like the `new` function, in that it
will return a new `TcpListener` instance. This function is called `bind`
because, in networking, connecting to a port to listen to is known as “binding
to a port”.
The `bind` function returns a `Result<T, E>`. Binding may fail, for example, if
we had tried to connect to port 80 without being an administrator. Another
example of a case when binding would fail is if we tried to have two programs
listening to the same port, which would happen if we ran two instances of our
program. Since were writing a basic server here, were not going to worry
about handling these kinds of errors, and `unwrap` lets us just stop the
program if they happen.
The `bind` function returns a `Result<T, E>`, which indicates that binding
might fail. For example, if we tried to connect to port 80 without being an
administrator, or if we ran two instances of our program and so had two
programs listening to the same port, binding wouldnt work. Because were
writing a basic server for learning purposes here, were not going to worry
about handling these kinds of errors, so we use `unwrap` to stop the program if
errors happen.
The `incoming` method on `TcpListener` returns an iterator that gives us a
sequence of streams (more specifically, streams of type `TcpStream`). A
sequence of streams (more specifically, streams of type `TcpStream`). A single
*stream* represents an open connection between the client and the server. A
*connection* is the name for the full request/response process when a client
connects to the server, the server generates a response, and the server closes
the connection. As such, the `TcpStream` will let us read from itself to see
what the client sent, and we can write our response to it. So this `for` loop
will process each connection in turn and produce a series of streams for us to
handle.
*connection* is the name for the full request/response process in which a
client connects to the server, the server generates a response, and the server
closes the connection. As such, `TcpStream` will read from itself to see what
the client sent, and allow us to write our response to the stream. Overall,
this `for` loop will process each connection in turn and produce a series of
streams for us to handle.
For now, handling a stream means calling `unwrap` to terminate our program if
the stream has any errors, then printing a message. Errors can happen because
were not actually iterating over connections, were iterating over *connection
attempts*. The connection might not work for a number of reasons, many of them
operating-system specific. For example, many operating systems have a limit to
the number of simultaneous open connections; new connection attempts will then
<!-- Below -- What if there aren't errors, how is the stream handled? Or is
there no functionality for that yet, only functionality for errors?
Also, highlighted below -- can you specify what errors we're talking
about---errors in *producing* the streams or connecting to the port?-->
<!--
There is no functionality for a stream without errors yet; I've clarified.
The errors happen when a client attempts to connect to the server; I've
clarified.
/Carol -->
For now, our handling of the stream consists of calling `unwrap` to terminate
our program if the stream has any errors, and if there arent any errors, then
print a message. Well add more functionality for the success case in the next
Listing. Receiving errors from the `incoming` method when a client connects to
the server is possible because were not actually iterating over connections,
were iterating over *connection attempts*. The connection might not be
successful for a number of reasons, many of them operating-system specific. For
example, many operating systems have a limit to the number of simultaneous open
connections they can support; new connection attempts beyond that number will
produce an error until some of the open connections are closed.
Lets try this code out! First invoke `cargo run` in the terminal, then load up
`127.0.0.1:8080` in a web browser. The browser will show an error message that
will say something similar to “Connection reset”, since were not currently
sending any data back. If we look at our terminal, though, well see a bunch of
messages that were printed when the browser connected to the server!
`127.0.0.1:7878` in a web browser. The browser should show an error message
like “Connection reset”, because the server isnt currently sending any data
back. If you look at your terminal, though, you should see a bunch of messages
that were printed when the browser connected to the server!
```text
Running `target/debug/hello`
@ -106,31 +120,31 @@ Connection established!
Connection established!
```
We got multiple messages printed out for one browser request; these connections
might be the browser making a request for the page and a request for a
`favicon.ico` icon that appears in the browser tab, or the browser might be
retrying the connection. Our browser is expecting to speak HTTP, but we arent
replying with anything, just closing the connection by moving on to the next
loop iteration. When `stream` goes out of scope and dropped at the end of the
loop, its connection gets closed as part of the `drop` implementation for
`TcpStream`. Browsers sometimes deal with closed connections by retrying, since
the problem might be temporary. The important thing is that weve successfully
gotten a handle on a TCP connection!
Sometimes, youll see multiple messages printed out for one browser request;
that might be because the browser is making a request for the page as well as a
request for other resources, like the `favicon.ico` icon that appears in the
browser tab.
It could also be that the browser is trying to connect to the server multiple
times because the server isnt responding with any data. When `stream` goes out
of scope and is dropped at the end of the loop, the connection is closed as
part of the `drop` implementation. Browsers sometimes deal with closed
connections by retrying, because the problem might be temporary. The important
thing is that weve successfully gotten a handle to a TCP connection!
Remember to stop the program with <span class="keystroke">ctrl-C</span> when
youre done running a particular version of the code, and restart `cargo run`
after youve made each set of code changes in order to be running the newest
code.
after youve made each set of code changes to make sure youre running the
newest code.
### Reading the Request
Lets read in the request from our browser! Since were adding more
functionality that has the purpose of handling the connection, lets start a
new function to have a nice separation of the concerns around setting up the
server and connections versus processing each connection. In this new
`handle_connection` function, well read data from the `stream` and print it
out in order to see the data that the browser is sending us. Change the code to
look like Listing 20-2:
Lets implement the functionality to read in the request from the browser! To
separate out the concerns of getting a connection and then taking some action
with the connection, well start a new function for processing connections. In
this new `handle_connection` function, well read data from the TCP stream and
print it out so we can see the data being sent from the browser. Change the
code to look like Listing 20-2:
<span class="filename">Filename: src/main.rs</span>
@ -140,7 +154,7 @@ use std::net::TcpListener;
use std::net::TcpStream;
fn main() {
let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
@ -161,33 +175,38 @@ fn handle_connection(mut stream: TcpStream) {
<span class="caption">Listing 20-2: Reading from the `TcpStream` and printing
out the data</span>
We added `std::io::prelude` to the beginning in order to bring traits into
scope that let us read from and write to the stream. Instead of printing a
message that we got a connection in the `for` loop in `main`, were calling the
new `handle_connection` function and passing the `stream` to it.
We bring `std::io::prelude` into scope to get access to certain traits that let
us read from and write to the stream. In the `for` loop in the `main` function,
instead of printing a message that says we made a connection, we now call the
new `handle_connection` function and pass the `stream` to it.
In `handle_connection`, we made the `stream` parameter mutable with the `mut`
keyword. As we read from a stream, the `TcpStream` instance might read more
than what we ask for into a buffer. Internally, it keeps track of what data it
has returned to us. It needs to be `mut` because of that state changing, so
even though we usually think of “reading” as not needing mutation, in this
case, we do need to use the `mut` keyword.
In the `handle_connection` function, weve made the `stream` parameter mutable.
This is because the `TcpStream` instance keeps track of what data it returns to
us internally. It might read more data than we asked for and save that data for
the next time we ask for data. It therefore needs to be `mut` because its
internal state might change; usually we think of “reading” as not needing
mutation, but in this case we need the `mut` keyword.
<!-- Above -- I'm not clear what state will change here, the content of stream
when the program tempers what data it takes? -->
<!-- Yes, which is what we mean by "internally". I've tried to reword a bit,
not sure if it's clearer. /Carol -->
Next, we need to actually read from the stream. We do this in two steps: first,
we declare a `buffer` on the stack to hold the data that we read in. Weve made
we declare a `buffer` on the stack to hold the data thats read in. Weve made
the buffer 512 bytes in size, which is big enough to hold the data of a basic
request. Thats sufficient for our purposes in this chapter. If we wanted to
handle requests of an arbitrary size, managing the buffer would need to be more
complicated, but were keeping it simple for now. We then pass the buffer to
request and sufficient for our purposes in this chapter. If we wanted to handle
requests of an arbitrary size, the management of the buffer would need to be
more complicated, but were keeping it simple for now. We pass the buffer to
`stream.read`, which will read bytes from the `TcpStream` and put them in the
buffer.
Then we convert the bytes in the buffer to a string and print out that string.
The `String::from_utf8_lossy` function takes a `&[u8]` and produces a `String`.
The lossy part of the name comes from the behavior when this function sees
invalid UTF-8 sequences: it replaces the invalid sequences with <20>, `U+FFFD
REPLACEMENT CHARACTER`. You might see the replacement characters for remaining
characters in the buffer that arent filled by request data.
We then convert the bytes in the buffer to a string and print out that string.
The `String::from_utf8_lossy` function takes a `&[u8]` and produces a `String`
from it. The lossy part of the name indicates the behavior of this function
when it sees an invalid UTF-8 sequence: it will replace the invalid sequence
with <20>, the `U+FFFD REPLACEMENT CHARACTER`. You might see replacement
characters for characters in the buffer that arent filled by request data.
Lets give this a try! Start up the program and make a request in a web browser
again. Note that well still get an error page in the browser, but the output
@ -199,7 +218,7 @@ $ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.42 secs
Running `target/debug/hello`
Request: GET / HTTP/1.1
Host: 127.0.0.1:8080
Host: 127.0.0.1:7878
User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64; rv:52.0) Gecko/20100101
Firefox/52.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
@ -210,15 +229,19 @@ Upgrade-Insecure-Requests: 1
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
```
Youll probably get slightly different output depending on your browser. You
also might see this request repeated again. Now that were printing out the
request data, we can see why were getting multiple connections from one
browser request by looking at the path after `Request: GET`. If the repeated
connections are all requesting `/`, we know the browser is trying to fetch `/`
repeatedly since its not getting a response from us.
Youll probably get slightly different output depending on your browser. Now
that were printing out the request data, we can see why we get multiple
connections from one browser request by looking at the path after `Request:
GET`. If the repeated connections are all requesting `/`, we know the browser
is trying to fetch `/` repeatedly because its not getting a response from our
program.
Lets break down this request data to understand what the browser is asking of
us. HTTP is a text-based protocol, and a request takes this format:
our program.
#### A Closer Look at an HTTP Request
HTTP is a text-based protocol, and a request takes this format:
```text
Method Request-URI HTTP-Version CRLF
@ -226,45 +249,49 @@ headers CRLF
message-body
```
The first line is called the *request line*, and it holds information about
what the client is requesting. The first part of the request line is a
*method*, like `GET` or `POST`, that describes how the client is making this
request.
First we have the *request line* that holds information about what the client
is requesting. The first part of the request line tells us the *method* being
used, like `GET` or `POST`, that describes how the client is making this
request. Our client used a `GET` request.
Then comes the requests *URI*, which stands for *Uniform Resource Identifier*.
URIs are almost, but not quite the same as URLs (*Uniform Resource Locators*),
which is what we typically call the addresses that we enter into a web browser.
The HTTP spec uses the term URI, and the difference between URIs and URLs isnt
important for our purposes of this chapter, so we can just mentally substitute
URL for URI here.
<!-- Below, is that right that the / part is the URI *being requested*, and not
the URI of the requester? -->
<!-- Yes /Carol -->
Next, we have the HTTP version that the client used, and then the request line
ends in a CRLF sequence. The CRLF sequence can also be written as `\r\n`: `\r`
is a *carriage return* and `\n` is a *line feed*. These terms come from the
typewriter days! The CRLF sequence separates the request line from the rest of
the request data.
The next part of the `Request` line is `/` which tells us the *URI* (Uniform
Resource Identifier) that the client is requesting---a URI is almost, but not
quite, the same as a URL (*Uniform Resource Locator*). The difference between
URIs and URLs isnt important for our purposes of this chapter, but the HTTP
spec uses the term URI, so we can just mentally substitute URL for URI here.
Taking a look at the request line data we saw printed out by our code:
Finally, were given the HTTP version used by the client, and then the request
line ends in a CRLF sequence. The CRLF sequence can also be written as `\r\n`:
`\r` is a *carriage return* and `\n` is a *line feed*. (These terms come from
the typewriter days!) The CRLF sequence separates the request line from the
rest of the request data. Note that when CRLF is printed out, we see a new line
started rather than `\r\n`.
```text
GET / HTTP/1.1
```
<!-- Above, I don't see a CRLF here in the request line in the actual output,
is it just implied because the next line begins on the next line? -->
<!-- Yes, I've clarified. /Carol -->
`GET` is the method, `/` is the Request URI, and `HTTP/1.1` is the version.
Taking a look at the request line data we received rom running our program so
far, we see that `GET` is the method, `/` is the Request URI, and `HTTP/1.1` is
the version.
The remaining lines starting from `Host:` onward are headers; `GET` requests
have no body.
Try making a request from a different browser, or asking for a different
address like `127.0.0.1:8080/test` to see how the request data changes, if
address like `127.0.0.1:7878/test` to see how the request data changes, if
youd like.
Now that we know what the browser is asking for, lets send some data back!
### Writing a Response
Lets send data back to our browser in response to its request. Responses have
this format:
Were going to implement the sending of data in response to a client request.
Responses have the following format:
```text
HTTP-Version Status-Code Reason-Phrase CRLF
@ -272,10 +299,10 @@ headers CRLF
message-body
```
The first line is called a *status line* and contains the HTTP version used in
the response, a numeric status code that summarizes the result of the request,
and a reason phrase that provides a text description of the status code. After
the CRLF sequence comes any headers, another CRLF sequence, and the body of the
The first line is a *status line* that contains the HTTP version used in the
response, a numeric status code that summarizes the result of the request, and
a reason phrase that provides a text description of the status code. After the
CRLF sequence comes any headers, another CRLF sequence, and the body of the
response.
Heres an example response that uses version 1.1 of HTTP, has a status code of
@ -285,9 +312,12 @@ Heres an example response that uses version 1.1 of HTTP, has a status code of
HTTP/1.1 200 OK\r\n\r\n
```
This text is a tiny successful HTTP response. Lets write this to the stream!
Remove the `println!` that was printing the request data, and add the code in
Listing 20-3 in its place:
The status code 200 is the standard success response. The text is a tiny
successful HTTP response. Lets write this to the stream as our response to a
successful request!
From the `handle_connection` function, we need to remove the `println!` that
was printing the request data, and replace it with the code in Listing 20-3:
<span class="filename">Filename: src/main.rs</span>
@ -309,29 +339,43 @@ fn handle_connection(mut stream: TcpStream) {
<span class="caption">Listing 20-3: Writing a tiny successful HTTP response to
the stream</span>
<!-- Flagging for addition of wingdings later -->
The first new line defines the `response` variable that holds the data of the
tiny success response were sending back. Then, we call `as_bytes` on our
`response` because the `write` method on `stream` takes a `&[u8]` and sends
success message. Then we call `as_bytes` on our `response` to convert the
string data to bytes. The `write` method on `stream` takes a `&[u8]` and sends
those bytes directly down the connection.
The `write` operation could fail, so `write` returns a `Result<T, E>`; were
continuing to use `unwrap` to make progress on the core ideas in this chapter
rather than error handling. Finally, `flush` will wait until all of the bytes
are written to the connection; `TcpStream` contains an internal buffer to
minimize calls into the underlying operating system.
<!-- Above--So what does adding as_bytes actually do, *allow* us to send bytes
directly? -->
<!-- It converts the string data to bytes, I've clarified /Carol -->
Because the `write` operation could fail, we use `unwrap` on any error result
as before. Again, in a real application you would add error-handling here.
Finally, `flush` will wait and prevent the program from continuing until all of
the bytes are written to the connection; `TcpStream` contains an internal
buffer to minimize calls into the underlying operating system.
<!-- Above -- Will flush wait until all bytes are written and then do
something? I'm not sure what task it's performing -->
<!-- `flush` just makes sure all the bytes we sent to `write` are actually
written to the stream before the function ends. Because writing to a stream
takes time, the `handle_connection` function could potentially finish and
`stream` could go out of scope before all the bytes given to `write` are sent,
unless we call `flush`. This is how streams work in many languages and is a
small detail I don't think is worth going into in depth. /Carol -->
With these changes, lets run our code and make a request! Were no longer
printing any data to the terminal, so we wont see any output there other than
the output from Cargo. When we load `127.0.0.1:8080` in a web browser, though,
we get a blank page instead of an error. How exciting! Youve just hand-coded
an HTTP request and response.
printing any data to the terminal, so we wont see any output other than the
output from Cargo. Load `127.0.0.1:7878` in a web browser, though, and you
should get a blank page instead of an error. How exciting! Youve just
hand-coded an HTTP request and response.
### Returning Real HTML
Lets return more than a blank page. Create a new file, *hello.html*, in the
root of your project directory, that is, not in the `src` directory. You can
put any HTML you want in it; Listing 20-4 shows what the authors used for
theirs:
Lets implement returning more than a blank page. Create a new file,
*hello.html*, in the root of your project directory---that is, not in the `src`
directory. You can put in any HTML you want; Listing 20-4 shows one possibility:
<span class="filename">Filename: hello.html</span>
@ -352,9 +396,10 @@ theirs:
<span class="caption">Listing 20-4: A sample HTML file to return in a
response</span>
This is a minimal HTML 5 document with a heading and a little paragraph. Lets
modify `handle_connection` as shown in Listing 20-5 to read the HTML file, add
it to the response as a body, and send it:
This is a minimal HTML 5 document with a heading and some text. To return this
from the server when a request is received, lets modify `handle_connection` as
shown in Listing 20-5 to read the HTML file, add it to the response as a body,
and send it:
<span class="filename">Filename: src/main.rs</span>
@ -385,32 +430,32 @@ fn handle_connection(mut stream: TcpStream) {
body of the response</span>
Weve added a line at the top to bring the standard librarys `File` into
scope, and the file opening and reading code should look familiar since we had
similar code in Chapter 12 when we read the contents of a file for our I/O
project in Listing 12-4.
scope. The code for opening files and reading code should look familiar from
Chapter 12, when we read the contents of a file for our I/O project in Listing
12-4.
Next, were using `format!` to add the files contents as the body of the
success response that we write to the stream.
success response.
Run it with `cargo run`, load up `127.0.0.1:8080` in your browser, and you
should see your HTML rendered!
Run this code with `cargo run`, load up `127.0.0.1:7878` in your browser, and
you should see your HTML rendered!
Note that were currently ignoring the request data in `buffer` and sending
back the contents of the HTML file unconditionally. Try requesting
`127.0.0.1:8080/something-else` in your browser and youll get back your HTML
for that request too. Sending back the same response for all requests is pretty
limited and not what most web servers do; lets examine the request and only
send back the HTML file for a well-formed request to `/`.
Currently were ignoring the request data in `buffer` and just sending back the
contents of the HTML file unconditionally. That means if you try requesting
`127.0.0.1:7878/something-else` in your browser youll still get back this same
HTML response. This makes for a pretty limited server and is not what most web
servers do. Wed like to customize our responses depending on the request, and
only send back the HTML file for a well-formed request to `/`.
### Validating the Request and Selectively Responding
Right now, our web server will return the HTML in the file no matter what the
client requested. Lets check that the browser is requesting `/`, and instead
return an error if the browser requests anything else. Lets modify
`handle_connection` as shown in Listing 20-6, which adds part of the code well
need. This part checks the content of the request we received against what we
know a request for `/` looks like and adds `if` and `else` blocks where well
add code to treat requests differently:
client requested. Lets add functionality to check that the browser is
requesting `/` before returning the HTML file, and return an error if the
browser requests anything else. For this we need to modify `handle_connection`
as shown in Listing 20-6. This new code checks the content of the request
received against what we know a request for `/` looks like and adds `if` and
`else` blocks to treat requests differently:
<span class="filename">Filename: src/main.rs</span>
@ -442,31 +487,30 @@ fn handle_connection(mut stream: TcpStream) {
}
```
<span class="caption">Listing 20-6: Matching the request against the content we
expect for a request to `/` and setting up conditionally handling requests to
`/` differently than other requests</span>
<span class="caption">Listing 20-6: Matching the request and handling requests
to `/` differently than other requests</span>
Here, we hardcoded the data corresponding to the request that were looking for
in the variable `get`. Because were reading raw bytes into the buffer, we use
a byte string, created with `b""`, to make `get` a byte string too. Then, we
check to see if `buffer` starts with the bytes in `get`. If it does, weve
gotten a well-formed request to `/`, which is the success case that we want to
handle in the `if` block. The `if` block contains the code we added in Listing
20-5 that returns the contents of our HTML file.
First, we hardcode the data corresponding to the `/` request into the `get`
variable. Because were reading raw bytes into the buffer, we transform `get`
into a byte string by adding the `b""` byte string syntax at the start of the
content data. Then, we check to see if `buffer` starts with the bytes in `get`.
If it does, it means weve received a well-formed request to `/`, which is the
success case well handle in the `if` block that returns the contents of our
HTML file.
If `buffer` does not start with the bytes in `get`, weve gotten some other
request. Well respond to all other requests using the code were about to add
in the `else` block.
If `buffer` does *not* start with the bytes in `get`, it means weve received
some other request. Well add code to the `else` block in a moment to respond
to all other requests.
If you run this code and request `127.0.0.1:8080`, youll get the HTML thats
in *hello.html*. If you make any other request, such as
`127.0.0.1:8080/something-else`, youll get a connection error like we saw when
Run this code now and request `127.0.0.1:7878`, and you should get the HTML in
*hello.html*. If you make any other request, such as
`127.0.0.1:7878/something-else`, youll get a connection error like we saw when
running the code in Listing 20-1 and Listing 20-2.
Lets add code to the `else` block as shown in Listing 20-7 to return a
response with the status code `404`, which signals that the content for the
request was not found. Well also return HTML for a page to render in the
browser indicating as such to the end user:
Now lets add the code in Listing 20-7 to the `else` block to return a response
with the status code `404`, which signals that the content for the request was
not found. Well also return some HTML for a page to render in the browser
indicating as such to the end user:
<span class="filename">Filename: src/main.rs</span>
@ -496,11 +540,11 @@ browser indicating as such to the end user:
<span class="caption">Listing 20-7: Responding with status code `404` and an
error page if anything other than `/` was requested</span>
Here, our response has a status line with status code `404` and the reason phrase
`NOT FOUND`. We still arent returning any headers, and the body of the
response will be the HTML in the file *404.html*. Also create a *404.html* file
next to *hello.html* for the error page; again feel free to use any HTML youd
like or use the example HTML in Listing 20-8:
Here, our response has a status line with status code `404` and the reason
phrase `NOT FOUND`. Were still not returning headers, and the body of the
response will be the HTML in the file *404.html*. Youll need to create a
*404.html* file next to *hello.html* for the error page; again feel free to use
any HTML youd like or use the example HTML in Listing 20-8:
<span class="filename">Filename: 404.html</span>
@ -521,18 +565,20 @@ like or use the example HTML in Listing 20-8:
<span class="caption">Listing 20-8: Sample content for the page to send back
with any `404` response</span>
With these changes, try running your server again. Requesting `127.0.0.1:8080`
With these changes, try running your server again. Requesting `127.0.0.1:7878`
should return the contents of *hello.html*, and any other request, like
`127.0.0.1:8080/foo`, should return the error HTML from *404.html*!
`127.0.0.1:7878/foo`, should return the error HTML from *404.html*!
Theres a lot of repetition between the code in the `if` and the `else` blocks:
theyre both reading files and writing the contents of the files to the stream.
The only differences between the two cases are the status line and the
filename. Lets pull those differences out into an `if` and `else` of one line
### A Touch of Refactoring
At the moment our `if` and `else` blocks have a lot of repetition: theyre both
reading files and writing the contents of the files to the stream. The only
differences are the status line and the filename. Lets make our code more
concise by pulling those differences out into an `if` and `else` of one line
each that will assign the values of the status line and the filename to
variables; we can then use those variables unconditionally in the code to read
the file and write the response. The resulting code after this refactoring is
shown in Listing 20-9:
the file and write the response. The resulting code after replacing the large
`if` and `else` blocks is shown in Listing 20-9:
<span class="filename">Filename: src/main.rs</span>
@ -570,22 +616,22 @@ fn handle_connection(mut stream: TcpStream) {
<span class="caption">Listing 20-9: Refactoring so that the `if` and `else`
blocks only contain the code that differs between the two cases</span>
Here, the only thing the `if` and `else` blocks do is return the appropriate
values for the status line and filename in a tuple; we then use destructuring
to assign these two values to `status_line` and `filename` using a pattern in
the `let` statement like we discussed in Chapter 18.
Now the `if` and `else` blocks only return the appropriate values for the
status line and filename in a tuple; we then use destructuring to assign these
two values to `status_line` and `filename` using a pattern in the `let`
statement like we discussed in Chapter 18.
The duplicated code to read the file and write the response is now outside the
`if` and `else` blocks, and uses the `status_line` and `filename` variables.
This makes it easier to see exactly whats different between the two cases, and
makes it so that we only have one place to update the code if we want to change
how the file reading and response writing works. The behavior of the code in
Listing 20-9 will be exactly the same as that in Listing 20-8.
The previously duplicated code is now outside the `if` and `else` blocks and
uses the `status_line` and `filename` variables. This makes it easier to see
exactly whats different between the two cases, and means we have only one
place to update the code if we want to change how the file reading and response
writing works. The behavior of the code in Listing 20-9 will be exactly the
same as that in Listing 20-8.
Awesome! We have a simple little web server in about 40 lines of Rust code that
responds to one request with a page of content and responds to all other
requests with a `404` response.
Since this server runs in a single thread, though, it can only serve one
request at a time. Lets see how that can be a problem by simulating some
slow requests.
Currently our server runs in a single thread, meaning it can only serve one
request at a time. Lets see how that can be a problem by simulating some slow
requests, and then fix it so our server can handle multiple requests at once.

File diff suppressed because it is too large Load Diff

View File

@ -1,110 +0,0 @@
## How Slow Requests Affect Throughput
Right now, the server will process each request in turn. That works for
services like ours that arent expected to get very many requests, but as
applications get more complex, this sort of serial execution isnt optimal.
Because our current program processes connections sequentially, it wont
process a second connection until its completed processing the first. If we
get one request that takes a long time to process, requests coming in during
that time will have to wait until the long request is finished, even if the new
requests can be processed quickly. Lets see this in action.
### Simulating a Slow Request in the Current Server Implementation
Lets see the effect of a request that takes a long time to process on requests
made to our current server implementation. Listing 20-10 shows the code to
respond to another request, `/sleep`, that will cause the server to sleep for
five seconds before responding. This will simulate a slow request so that we
can see that our server processes requests serially.
<span class="filename">Filename: src/main.rs</span>
```rust
use std::thread;
use std::time::Duration;
# use std::io::prelude::*;
# use std::net::TcpStream;
# use std::fs::File;
// --snip--
fn handle_connection(mut stream: TcpStream) {
# let mut buffer = [0; 512];
# stream.read(&mut buffer).unwrap();
// --snip--
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else if buffer.starts_with(sleep) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
// --snip--
}
```
<span class="caption">Listing 20-10: Simulating a slow request by recognizing
`/sleep` and sleeping for 5 seconds</span>
This code is a bit messy, but its good enough for our simulation purposes! We
created a second request `sleep`, whose data well recognize. We added an `else
if` after the `if` block to check for the request to `/sleep`, and when we see
that request, well sleep for five seconds before rendering the hello page.
You can really see how primitive our server is here; real libraries would
handle the recognition of multiple requests in a less verbose way!
Start the server with `cargo run`, and then open up two browser windows: one
for `http://localhost:8080/` and one for `http://localhost:8080/sleep`. If
you hit `/` a few times, as before, youll see it respond quickly. But if you
hit `/sleep`, and then load up `/`, youll see that `/` waits until `sleep`
has slept for its full five seconds before going on.
There are multiple ways we could change how our web server works in order to
avoid having all requests back up behind a slow request; the one were going to
implement is a thread pool.
### Improving Throughput with a Thread Pool
A *thread pool* is a group of spawned threads that are ready to handle some
task. When the program receives a new task, one of the threads in the pool will
be assigned the task and will go off and process it. The remaining threads in
the pool are available to handle any other tasks that come in while the first
thread is processing. When the first thread is done processing its task, it
gets returned to the pool of idle threads ready to handle a new task.
A thread pool will allow us to process connections concurrently: we can start
processing a new connection before an older connection is finished. This
increases the throughput of our server.
Heres what were going to implement: instead of waiting for each request to
process before starting on the next one, well send the processing of each
connection to a different thread. The threads will come from a pool of four
threads that well spawn when we start our program. The reason were limiting
the number of threads to a small number is that if we created a new thread for
each request as the requests come in, someone making ten million requests to
our server could create havoc by using up all of our servers resources and
grinding the processing of all requests to a halt.
Rather than spawning unlimited threads, well have a fixed number of threads
waiting in the pool. As requests come in, well send the requests to the pool
for processing. The pool will maintain a queue of incoming requests. Each of
the threads in the pool will pop a request off of this queue, handle the
request, and then ask the queue for another request. With this design, we can
process `N` requests concurrently, where `N` is the number of threads. This
still means that `N` long-running requests can cause requests to back up in the
queue, but weve increased the number of long-running requests we can handle
before that point from one to `N`.
This design is one of many ways to improve the throughput of our web server.
This isnt a book about web servers, though, so its the one were going to
cover. Other options are the fork/join model and the single threaded async I/O
model. If youre interested in this topic, you may want to read more about
other solutions and try to implement them in Rust; with a low-level language
like Rust, all of these options are possible.

View File

@ -1,284 +0,0 @@
## Designing the Thread Pool Interface
Lets talk about what using the pool should look like. The authors often find
that when trying to design some code, writing the client interface first can
really help guide your design. Write the API of the code to be structured in
the way youd want to call it, then implement the functionality within that
structure rather than implementing the functionality then designing the public
API.
Similar to how we used Test Driven Development in the project in Chapter 12,
were going to use Compiler Driven Development here. Were going to write the
code that calls the functions we wish we had, then well lean on the compiler
to tell us what we should change next. The compiler error messages will guide
our implementation.
### Code Structure if We Could Use `thread::spawn`
First, lets explore what the code to create a new thread for every connection
could look like. This isnt our final plan due to the problems with potentially
spawning an unlimited number of threads that we talked about earlier, but its
a start. Listing 20-11 shows the changes to `main` to spawn a new thread to
handle each stream within the `for` loop:
<span class="filename">Filename: src/main.rs</span>
```rust,no_run
# use std::thread;
# use std::io::prelude::*;
# use std::net::TcpListener;
# use std::net::TcpStream;
#
fn main() {
let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
thread::spawn(|| {
handle_connection(stream);
});
}
}
# fn handle_connection(mut stream: TcpStream) {}
```
<span class="caption">Listing 20-11: Spawning a new thread for each
stream</span>
As we learned in Chapter 16, `thread::spawn` will create a new thread and then
run the code in the closure in it. If you run this code and load `/sleep` and
then `/` in two browser tabs, youll indeed see the request to `/` doesnt have
to wait for `/sleep` to finish. But as we mentioned, this will eventually
overwhelm the system since were making new threads without any limit.
### Creating a Similar Interface for `ThreadPool`
We want our thread pool to work in a similar, familiar way so that switching
from threads to a thread pool doesnt require large changes to the code we want
to run in the pool. Listing 20-12 shows the hypothetical interface for a
`ThreadPool` struct wed like to use instead of `thread::spawn`:
<span class="filename">Filename: src/main.rs</span>
```rust,no_run
# use std::thread;
# use std::io::prelude::*;
# use std::net::TcpListener;
# use std::net::TcpStream;
# struct ThreadPool;
# impl ThreadPool {
# fn new(size: u32) -> ThreadPool { ThreadPool }
# fn execute<F>(&self, f: F)
# where F: FnOnce() + Send + 'static {}
# }
#
fn main() {
let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
# fn handle_connection(mut stream: TcpStream) {}
```
<span class="caption">Listing 20-12: How we want to be able to use the
`ThreadPool` were going to implement</span>
We use `ThreadPool::new` to create a new thread pool with a configurable number
of threads, in this case four. Then, in the `for` loop, `pool.execute` will
work in a similar way to `thread::spawn`.
### Compiler Driven Development to Get the API Compiling
Go ahead and make the changes in Listing 20-12 to *src/main.rs*, and lets use
the compiler errors to drive our development. Heres the first error we get:
```text
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve. Use of undeclared type or module `ThreadPool`
--> src\main.rs:10:16
|
10 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^^^^^^ Use of undeclared type or module
`ThreadPool`
error: aborting due to previous error
```
Great, we need a `ThreadPool`. Lets switch the `hello` crate from a binary
crate to a library crate to hold our `ThreadPool` implementation, since the
thread pool implementation will be independent of the particular kind of work
that were doing in our web server. Once weve got the thread pool library
written, we could use that functionality to do whatever work we want to do, not
just serve web requests.
So create *src/lib.rs* that contains the simplest definition of a `ThreadPool`
struct that we can have for now:
<span class="filename">Filename: src/lib.rs</span>
```rust
pub struct ThreadPool;
```
Then create a new directory, *src/bin*, and move the binary crate rooted in
*src/main.rs* into *src/bin/main.rs*. This will make the library crate be the
primary crate in the *hello* directory; we can still run the binary in
*src/bin/main.rs* using `cargo run` though. After moving the *main.rs* file,
edit it to bring the library crate in and bring `ThreadPool` into scope by
adding this at the top of *src/bin/main.rs*:
<span class="filename">Filename: src/bin/main.rs</span>
```rust,ignore
extern crate hello;
use hello::ThreadPool;
```
And try again in order to get the next error that we need to address:
```text
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
error: no associated item named `new` found for type `hello::ThreadPool` in the
current scope
--> src\main.rs:13:16
|
13 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^^^^^^
|
```
Cool, the next thing is to create an associated function named `new` for
`ThreadPool`. We also know that `new` needs to have one parameter that can
accept `4` as an argument, and `new` should return a `ThreadPool` instance.
Lets implement the simplest `new` function that will have those
characteristics:
<span class="filename">Filename: src/lib.rs</span>
```rust
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: u32) -> ThreadPool {
ThreadPool
}
}
```
We picked `u32` as the type of the `size` parameter, since we know that a
negative number of threads makes no sense. `u32` is a solid default. Once we
actually implement `new` for real, well reconsider whether this is the right
choice for what the implementation needs, but for now, were just working
through compiler errors.
Lets check the code again:
```text
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
warning: unused variable: `size`, #[warn(unused_variables)] on by default
--> src/lib.rs:4:16
|
4 | pub fn new(size: u32) -> ThreadPool {
| ^^^^
error: no method named `execute` found for type `hello::ThreadPool` in the
current scope
--> src/main.rs:18:14
|
18 | pool.execute(|| {
| ^^^^^^^
```
Okay, a warning and an error. Ignoring the warning for a moment, the error is
because we dont have an `execute` method on `ThreadPool`. Lets define one,
and we need it to take a closure. If you remember from Chapter 13, we can take
closures as arguments with three different traits: `Fn`, `FnMut`, and `FnOnce`.
What kind of closure should we use? Well, we know were going to end up doing
something similar to `thread::spawn`; what bounds does the signature of
`thread::spawn` have on its argument? Lets look at the documentation, which
says:
```rust,ignore
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static
```
`F` is the parameter we care about here; `T` is related to the return value and
were not concerned with that. Given that `spawn` uses `FnOnce` as the trait
bound on `F`, its probably what we want as well, since well eventually be
passing the argument we get in `execute` to `spawn`. We can be further
confident that `FnOnce` is the trait that we want to use since the thread for
running a request is only going to execute that requests closure one time.
`F` also has the trait bound `Send` and the lifetime bound `'static`, which
also make sense for our situation: we need `Send` to transfer the closure from
one thread to another, and `'static` because we dont know how long the thread
will execute. Lets create an `execute` method on `ThreadPool` that will take a
generic parameter `F` with these bounds:
<span class="filename">Filename: src/lib.rs</span>
```rust
# pub struct ThreadPool;
impl ThreadPool {
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
}
}
```
The `FnOnce` trait still needs the `()` after it since this `FnOnce` is
representing a closure that takes no parameters and doesnt return a value.
Just like function definitions, the return type can be omitted from the
signature, but even if we have no parameters, we still need the parentheses.
Again, since were working on getting the interface compiling, were adding the
simplest implementation of the `execute` method, which does nothing. Lets
check again:
```text
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
warning: unused variable: `size`, #[warn(unused_variables)] on by default
--> src/lib.rs:4:16
|
4 | pub fn new(size: u32) -> ThreadPool {
| ^^^^
warning: unused variable: `f`, #[warn(unused_variables)] on by default
--> src/lib.rs:8:30
|
8 | pub fn execute<F>(&self, f: F)
| ^
```
Only warnings now! It compiles! Note that if you try `cargo run` and making a
request in the browser, though, youll see the errors in the browser again that
we saw in the beginning of the chapter. Our library isnt actually calling the
closure passed to `execute` yet!
> A saying you might hear about languages with strict compilers like Haskell
> and Rust is “if the code compiles, it works.” This is a good time to remember
> that this is just a phrase and a feeling people sometimes have, its not
> actually universally true. Our project compiles, but it does absolutely
> nothing! If we were building a real, complete project, this would be a great
> time to start writing unit tests to check that the code compiles *and* has
> the behavior we want.

View File

@ -1,23 +1,26 @@
## Graceful Shutdown and Cleanup
The code in Listing 20-21 is responding to requests asynchronously through the
use of a thread pool, as we intended. We get some warnings about fields that
were not using in a direct way, which are a reminder that were not cleaning
anything up. When we use <span class="keystroke">ctrl-C</span> to halt the main
thread, all the other threads are stopped immediately as well, even if theyre
in the middle of serving a request.
use of a thread pool, as we intended. We get some warnings about the `workers`,
`id`, and `thread` fields that were not using in a direct way that reminds us
were not cleaning anything up. When we use the less elegant <span
class="keystroke">ctrl-C</span> method to halt the main thread, all other
threads are stopped immediately as well, even if theyre in the middle of
serving a request.
Were now going to implement the `Drop` trait for `ThreadPool` to call `join`
on each of the threads in the pool so that the threads will finish the requests
theyre working on. Then well implement a way for the `ThreadPool` to tell the
threads they should stop accepting new requests and shut down. To see this code
in action, well modify our server to only accept two requests before
gracefully shutting down its thread pool.
Were now going to implement the `Drop` trait to call `join` on each of the
threads in the pool so they can finish the requests theyre working on before
closing. Then well implement a way to tell the threads they should stop
accepting new requests and shut down. To see this code in action, well modify
our server to only accept two requests before gracefully shutting down its
thread pool.
### Implementing the `Drop` Trait on `ThreadPool`
Lets start with implementing `Drop` for our thread pool. When the pool is
dropped, we should join on all of our threads to make sure they finish their
work. Listing 20-22 shows a first attempt at a `Drop` implementation; this code
wont quite work yet:
dropped, our threads should all join on to make sure they finish their work.
Listing 20-23 shows a first attempt at a `Drop` implementation; this code wont
quite work yet:
<span class="filename">Filename: src/lib.rs</span>
@ -33,14 +36,15 @@ impl Drop for ThreadPool {
}
```
<span class="caption">Listing 20-22: Joining each thread when the thread pool
<span class="caption">Listing 20-23: Joining each thread when the thread pool
goes out of scope</span>
We loop through each of the thread pool `workers`, using `&mut` because `self`
is itself a mutable reference and we also need to be able to mutate `worker`.
We print out a message saying that this particular worker is shutting down, and
then we call `join` on that workers thread. If the call to `join` fails, we
`unwrap` the error to panic and go into an ungraceful shutdown.
First we loop through each of the thread pool `workers`. We use `&mut` for this
because `self` is itself a mutable reference and we also need to be able to
mutate `worker`. For each worker, we print a message saying that this
particular worker is shutting down, and then we call `join` on that workers
thread. If the call to `join` fails, we use `unwrap` to make Rust panic and go
into an ungraceful shutdown.
Heres the error we get if we compile this code:
@ -52,15 +56,16 @@ error[E0507]: cannot move out of borrowed content
| ^^^^^^ cannot move out of borrowed content
```
Because we only have a mutable borrow of each `worker`, we cant call `join`:
`join` takes ownership of its argument. In order to solve this, we need a way
to move the `thread` out of the `Worker` instance that owns `thread` so that
`join` can consume the thread. We saw a way to do this in Listing 17-15: if the
`Worker` holds an `Option<thread::JoinHandle<()>` instead, we can call the
`take` method on the `Option` to move the value out of the `Some` variant and
leave a `None` variant in its place. In other words, a `Worker` that is running
will have a `Some` variant in `thread`, and when we want to clean up a worker,
well replace `Some` with `None` so the worker doesnt have a thread to run.
This tells use we cant call `join` because we only have a mutable borrow of
each `worker`, and `join` takes ownership of its argument. In order to solve
this, we need a way to move the thread out of the `Worker` instance that owns
`thread` so that `join` can consume the thread. We saw a way to do this in
Listing 17-15: if `Worker` holds an `Option<thread::JoinHandle<()>` instead, we
can call the `take` method on the `Option` to move the value out of the `Some`
variant and leave a `None` variant in its place. In other words, a `Worker`
that is running will have a `Some` variant in `thread`, and when we want to
clean up a worker, well replace `Some` with `None` so the worker doesnt have
a thread to run.
So we know we want to update the definition of `Worker` like this:
@ -74,11 +79,11 @@ struct Worker {
}
```
Now lets lean on the compiler to find the other places that need to change. We
get two errors:
Now lets lean on the compiler to find the other places that need to change.
Checking this code, we get two errors:
```text
error: no method named `join` found for type
error[E0599]: no method named `join` found for type
`std::option::Option<std::thread::JoinHandle<()>>` in the current scope
--> src/lib.rs:65:27
|
@ -86,18 +91,22 @@ error: no method named `join` found for type
| ^^^^
error[E0308]: mismatched types
--> src/lib.rs:89:21
--> src/lib.rs:89:13
|
89 | thread,
| ^^^^^^ expected enum `std::option::Option`, found
struct `std::thread::JoinHandle`
| ^^^^^^
| |
| expected enum `std::option::Option`, found struct
`std::thread::JoinHandle`
| help: try using a variant of the expected type: `Some(thread)`
|
= note: expected type `std::option::Option<std::thread::JoinHandle<()>>`
found type `std::thread::JoinHandle<_>`
```
The second error is pointing to the code at the end of `Worker::new`; we need
to wrap the `thread` value in `Some` when we create a new `Worker`:
Lets address the second error, which points to the code at the end of
`Worker::new`; we need to wrap the `thread` value in `Some` when we create a
new `Worker`. Make the following changes to fix this:
<span class="filename">Filename: src/lib.rs</span>
@ -114,9 +123,9 @@ impl Worker {
}
```
The first error is in our `Drop` implementation, and we mentioned that well be
calling `take` on the `Option` value to move `thread` out of `worker`. Heres
what that looks like:
The first error is in our `Drop` implementation. We mentioned earlier that we
intended to call `take` on the `Option` value to move `thread` out of `worker`.
The following changes will do so:
<span class="filename">Filename: src/lib.rs</span>
@ -136,21 +145,23 @@ impl Drop for ThreadPool {
As we saw in Chapter 17, the `take` method on `Option` takes the `Some` variant
out and leaves `None` in its place. Were using `if let` to destructure the
`Some` and get the thread, then call `join` on the thread. If a workers thread
is already `None`, then we know this worker has already had its thread cleaned
up so we dont do anything in that case.
`Some` and get the thread, then we call `join` on the thread. If a workers
thread is already `None`, we know that worker has already had its thread
cleaned up, so nothing happens in that case.
### Signaling to the Threads to Stop Listening for Jobs
With this, our code compiles without any warnings. Bad news though, this code
doesnt function the way we want it to yet. The key is the logic in the
closures that the spawned threads of the `Worker` instances run: calling `join`
wont shut down the threads since they `loop` forever looking for jobs. If we
try to drop our `ThreadPool` with this implementation, the main thread will
block forever waiting for the first thread to finish.
closures run by the threads of the `Worker` instances: at the moment we call
`join`, but that wont shut down the threads because they `loop` forever looking
for jobs. If we try to drop our `ThreadPool` with this implementation, the main
thread will block forever waiting for the first thread to finish.
To fix this, were going to modify the threads to listen for either a `Job` to
run or a signal that they should stop listening and exit the infinite loop. So
instead of `Job` instances, our channel will send one of these two enum
variants:
To fix this, were going to modify the threads so they listen for either a
`Job` to run or a signal that they should stop listening and exit the infinite
loop. Instead of `Job` instances, then, our channel will send one of these two
enum variants:
<span class="filename">Filename: src/lib.rs</span>
@ -167,7 +178,7 @@ thread should run, or it will be a `Terminate` variant that will cause the
thread to exit its loop and stop.
We need to adjust the channel to use values of type `Message` rather than type
`Job`, as shown in Listing 20-23:
`Job`, as shown in Listing 20-24:
<span class="filename">Filename: src/lib.rs</span>
@ -181,13 +192,6 @@ pub struct ThreadPool {
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
// --snip--
}
pub fn execute<F>(&self, f: F)
where
@ -232,20 +236,21 @@ impl Worker {
}
```
<span class="caption">Listing 20-23: Sending and receiving `Message` values and
<span class="caption">Listing 20-24: Sending and receiving `Message` values and
exiting the loop if a `Worker` receives `Message::Terminate`</span>
We need to change `Job` to `Message` in the definition of `ThreadPool`, in
`ThreadPool::new` where we create the channel, and in the signature of
`Worker::new`. The `execute` method of `ThreadPool` needs to send jobs wrapped
in the `Message::NewJob` variant. Then, in `Worker::new` where we receive a
`Message` from the channel, well process the job if we get the `NewJob`
variant and break out of the loop if we get the `Terminate` variant.
To incorporate the `Message` enum we need to change `Job` to `Message` in two
places: the definition of `ThreadPool` and the signature of `Worker::new`. The
`execute` method of `ThreadPool` needs to send jobs wrapped in the
`Message::NewJob` variant. Then, in `Worker::new` where a `Message` is received
from the channel, the job will be processed if the `NewJob` variant is
received, and the thread will break out of the loop if the `Terminate` variant
is received.
With these changes, the code will compile again and continue to function in the
same way as it has been. Well get a warning, though, because we arent using
the `Terminate` variant in any messages. Lets change our `Drop` implementation
to look like Listing 20-24:
With these changes, the code will compile and continue to function in the same
way as it has been. We will get a warning, though, because we arent creating
any messages of the `Terminate` variety. Lets fix this by changing our `Drop`
implementation to look like Listing 20-25:
<span class="filename">Filename: src/lib.rs</span>
@ -271,40 +276,39 @@ impl Drop for ThreadPool {
}
```
<span class="caption">Listing 20-24: Sending `Message::Terminate` to the
<span class="caption">Listing 20-25: Sending `Message::Terminate` to the
workers before calling `join` on each worker thread</span>
Were now iterating over the workers twice, once to send one `Terminate`
message for each worker, and once to call `join` on each workers thread. If we
tried to send a message and join immediately in the same loop, its not
guaranteed that the worker in the current iteration will be the one that gets
the message from the channel.
tried to send a message and `join` immediately in the same loop, we couldnt
guarantee that the worker in the current iteration would be the one to get the
message from the channel.
To understand better why we need two separate loops, imagine a scenario with
two workers. If we iterated through each worker in one loop, on the first
iteration where `worker` is the first worker, wed send a terminate message
down the channel and call `join` on the first workers thread. If the first
worker was busy processing a request at that moment, the second worker would
pick up the terminate message from the channel and shut down. Were waiting on
the first worker to shut down, but it never will since the second thread picked
up the terminate message. Were now blocking forever waiting for the first
worker to shut down, and well never send the second message to terminate.
Deadlock!
To better understand why we need two separate loops, imagine a scenario with
two workers. If we used a single loop to iterate through each worker, on the
first iteration a terminate message would be sent down the channel and `join`
called on the first workers thread. If that first worker was busy processing a
request at that moment, the second worker would pick up the terminate message
from the channel and shut down. Wed be left waiting on the first worker to
shut down, but it never will because the second thread picked up the terminate
message. Deadlock!
To prevent this, we first put all of our `Terminate` messages on the channel,
and then we join on all the threads. Because each worker will stop receiving
requests on the channel once it gets a terminate message, we can be sure that
if we send the same number of terminate messages as there are workers, each
worker will receive a terminate message before we call `join` on its thread.
To prevent this, we first put all of our `Terminate` messages on the channel in
one loop, and then we join on all the threads in another loop. Each worker will
stop receiving requests on the channel once it gets a terminate message,
meaning we can be sure that if we send the same number of terminate messages as
there are workers, each worker will receive a terminate message before `join`
is called on its thread.
In order to see this code in action, lets modify `main` to only accept two
requests before gracefully shutting the server down as shown in Listing 20-25:
requests before gracefully shutting the server down as shown in Listing 20-26:
<span class="filename">Filename: src/bin/main.rs</span>
```rust,ignore
fn main() {
let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
@ -319,20 +323,20 @@ fn main() {
}
```
<span class="caption">Listing 20-25: Shut down the server after serving two
<span class="caption">Listing 20-26: Shut down the server after serving two
requests by exiting the loop</span>
Only serving two requests isnt behavior youd like a production web server to
have, but this will let us see the graceful shutdown and cleanup working since
we wont be stopping the server with <span class="keystroke">ctrl-C</span>.
You wouldnt want a real-world web server to shut down after serving only two
requests, this just demonstrates the graceful shutdown and cleanup in working
order.
The `.take(2)` we added to `listener.incoming()` artificially limits the
iteration to the first 2 items at most. This combinator works for any
implementation of the `Iterator` trait. The `ThreadPool` will go out of scope
at the end of `main`, and well see the `drop` implementation run.
The `take` method is defined in the `Iterator` trait, and limits the iteration
to the first 2 items at most. The `ThreadPool` will go out of scope at the end
of `main`, and well see the `drop` implementation run.
Start the server with `cargo run`, and make three requests. The third request
should error, and in your terminal you should see output that looks like:
should error, and in your terminal you should see output that looks similar to
this:
```text
$ cargo run
@ -354,24 +358,24 @@ Shutting down worker 2
Shutting down worker 3
```
You may get a different ordering, of course. We can see how this works from the
messages: workers zero and three got the first two requests, and then on the
third request, we stop accepting connections. When the `ThreadPool` goes out of
scope at the end of `main`, its `Drop` implementation kicks in, and the pool
tells all workers to terminate. The workers each print a message when they see
the terminate message, and then the thread pool calls `join` to shut down each
worker thread.
You may see a different ordering of workers and messages printed. We can see
how this works from the messages: workers zero and three got the first two
requests, and then on the third request the server stopped accepting
connections. When the `ThreadPool` goes out of scope at the end of `main`, its
`Drop` implementation kicks in, and the pool tells all workers to terminate.
The workers each print a message when they see the terminate message, and then
the thread pool calls `join` to shut down each worker thread.
One interesting aspect of this particular execution: notice that we sent the
terminate messages down the channel, and before any worker received the
messages, we tried to join worker zero. Worker zero had not yet gotten the
terminate message, so the main thread blocked waiting for worker zero to
finish. In the meantime, each of the workers received the termination messages.
Once worker zero finished, the main thread waited for the rest of the workers
to finish, and they had all received the termination message and were able to
shut down at that point.
One interesting aspect of this particular execution: notice that the
`ThreadPool` sent the terminate messages down the channel, and before any
worker received the messages, we tried to join worker 0. Worker 0 had not yet
gotten the terminate message, so the main thread blocked waiting for worker 0
to finish. In the meantime, each of the workers received the termination
messages. Once worker 0 finished, the main thread waited for the rest of the
workers to finish, and they had all received the termination message and were
able to shut down at that point.
Congrats! We now have completed our project, and we have a basic web server
Congrats! We have now completed our project, and we have a basic web server
that uses a thread pool to respond asynchronously. Were able to perform a
graceful shutdown of the server, which cleans up all the threads in the pool.
Heres the full code for reference:
@ -390,7 +394,7 @@ use std::thread;
use std::time::Duration;
fn main() {
let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
@ -561,7 +565,7 @@ project, here are some ideas:
- Add more documentation to `ThreadPool` and its public methods
- Add tests of the librarys functionality
- Change calls to `unwrap` to more robust error handling
- Use `ThreadPool` to perform some other task rather than serving web requests
- Use `ThreadPool` to perform some task other than serving web requests
- Find a thread pool crate on crates.io and implement a similar web server
using the crate instead and compare its API and robustness to the thread pool
we implemented
@ -570,6 +574,6 @@ project, here are some ideas:
Well done! Youve made it to the end of the book! Wed like to thank you for
joining us on this tour of Rust. Youre now ready to go out and implement your
own Rust projects or help with other peoples. Remember theres a community of
own Rust projects and help with other peoples. Remember theres a community of
other Rustaceans who would love to help you with any challenges you encounter
on your Rust journey.

View File

@ -1,248 +0,0 @@
## Creating the Thread Pool and Storing Threads
The warnings are because we arent doing anything with the parameters to `new`
and `execute`. Lets implement the bodies of both of these with the actual
behavior we want.
### Validating the Number of Threads in the Pool
To start, lets think about `new`. We mentioned before that we picked an
unsigned type for the `size` parameter since a pool with a negative number of
threads makes no sense. However, a pool with zero threads also makes no sense,
yet zero is a perfectly valid `u32`. Lets check that `size` is greater than
zero before we return a `ThreadPool` instance and panic if we get zero by using
the `assert!` macro as shown in Listing 20-13:
<span class="filename">Filename: src/lib.rs</span>
```rust
# pub struct ThreadPool;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: u32) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
}
```
<span class="caption">Listing 20-13: Implementing `ThreadPool::new` to panic if
`size` is zero</span>
Weve taken this opportunity to add some documentation for our `ThreadPool`
with doc comments. Note that we followed good documentation practices and added
a section that calls out the situations in which our function can panic as we
discussed in Chapter 14. Try running `cargo doc --open` and clicking on the
`ThreadPool` struct to see what the generate docs for `new` look like!
Instead of adding the use of the `assert!` macro as weve done here, we could
make `new` return a `Result` instead like we did with `Config::new` in the I/O
project in Listing 12-9, but weve decided in this case that trying to create a
thread pool without any threads should be an unrecoverable error. If youre
feeling ambitious, try to write a version of `new` with this signature to see
how you feel about both versions:
```rust,ignore
fn new(size: u32) -> Result<ThreadPool, PoolCreationError> {
```
### Storing Threads in the Pool
Now that we know we have a valid number of threads to store in the pool, we can
actually create that many threads and store them in the `ThreadPool` struct
before returning it.
This raises a question: how do we “store” a thread? Lets take another look at
the signature of `thread::spawn`:
```rust,ignore
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static
```
`spawn` returns a `JoinHandle<T>`, where `T` is the type thats returned from
the closure. Lets try using `JoinHandle` too and see what happens. In our
case, the closures were passing to the thread pool will handle the connection
and not return anything, so `T` will be the unit type `()`.
This wont compile yet, but lets consider the code shown in Listing 20-14.
Weve changed the definition of `ThreadPool` to hold a vector of
`thread::JoinHandle<()>` instances, initialized the vector with a capacity of
`size`, set up a `for` loop that will run some code to create the threads, and
returned a `ThreadPool` instance containing them:
<span class="filename">Filename: src/lib.rs</span>
```rust,ignore
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
pub fn new(size: u32) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool {
threads
}
}
// --snip--
}
```
<span class="caption">Listing 20-14: Creating a vector for `ThreadPool` to hold
the threads</span>
Weve brought `std::thread` into scope in the library crate, since were using
`thread::JoinHandle` as the type of the items in the vector in `ThreadPool`.
After we have a valid size, were creating a new vector that can hold `size`
items. We havent used `with_capacity` in this book yet; it does the same thing
as `Vec::new`, but with an important difference: it pre-allocates space in the
vector. Since we know that we need to store `size` elements in the vector,
doing this allocation up-front is slightly more efficient than only writing
`Vec::new`, since `Vec::new` resizes itself as elements get inserted. Since
weve created a vector the exact size that we need up front, no resizing of the
underlying vector will happen while we populate the items.
That is, if this code works, which it doesnt quite yet! If we check this code,
we get an error:
```text
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
error[E0308]: mismatched types
--> src\main.rs:70:46
|
70 | let mut threads = Vec::with_capacity(size);
| ^^^^ expected usize, found u32
error: aborting due to previous error
```
`size` is a `u32`, but `Vec::with_capacity` needs a `usize`. We have two
options here: we can change our functions signature, or we can cast the `u32`
as a `usize`. If you remember when we defined `new`, we didnt think too hard
about what number type made sense, we just chose one. Lets give it some more
thought now. Given that `size` is the length of a vector, `usize` makes a lot
of sense. They even almost share a name! Lets change the signature of `new`,
which will get the code in Listing 20-14 to compile:
```rust,ignore
fn new(size: usize) -> ThreadPool {
```
If run `cargo check` again, youll get a few more warnings, but it should
succeed.
We left a comment in the `for` loop in Listing 20-14 regarding the creation of
threads. How do we actually create threads? This is a tough question. What
should go in these threads? We dont know what work they need to do at this
point, since the `execute` method takes the closure and gives it to the pool.
Lets refactor slightly: instead of storing a vector of `JoinHandle<()>`
instances, lets create a new struct to represent the concept of a *worker*. A
worker will be what receives a closure in the `execute` method, and it will
take care of actually calling the closure. In addition to letting us store a
fixed `size` number of `Worker` instances that dont yet know about the
closures theyre going to be executing, we can also give each worker an `id` so
we can tell the different workers in the pool apart when logging or debugging.
Lets make these changes:
1. Define a `Worker` struct that holds an `id` and a `JoinHandle<()>`
2. Change `ThreadPool` to hold a vector of `Worker` instances
3. Define a `Worker::new` function that takes an `id` number and returns a
`Worker` instance with that `id` and a thread spawned with an empty closure,
which well fix soon
4. In `ThreadPool::new`, use the `for` loop counter to generate an `id`, create
a new `Worker` with that `id`, and store the worker in the vector
If youre up for a challenge, try implementing these changes on your own before
taking a look at the code in Listing 20-15.
Ready? Heres Listing 20-15 with one way to make these modifications:
<span class="filename">Filename: src/lib.rs</span>
```rust
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool {
workers
}
}
// --snip--
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker {
id,
thread,
}
}
}
```
<span class="caption">Listing 20-15: Modifying `ThreadPool` to hold `Worker`
instances instead of threads directly</span>
Weve chosen to change the name of the field on `ThreadPool` from `threads` to
`workers` since weve changed what were holding, which is now `Worker`
instances instead of `JoinHandle<()>` instances. We use the counter in the
`for` loop as an argument to `Worker::new`, and we store each new `Worker` in
the vector named `workers`.
The `Worker` struct and its `new` function are private since external code
(like our server in *src/bin/main.rs*) doesnt need to know the implementation
detail that were using a `Worker` struct within `ThreadPool`. The
`Worker::new` function uses the given `id` and stores a `JoinHandle<()>`
created by spawning a new thread using an empty closure.
This code compiles and is storing the number of `Worker` instances that we
specified as an argument to `ThreadPool::new`, but were *still* not processing
the closure that we get in `execute`. Lets talk about how to do that next.

View File

@ -1,516 +0,0 @@
## Sending Requests to Threads Via Channels
The next problem to tackle is that our closures do absolutely nothing. Weve
been working around the problem that we get the actual closure we want to
execute in the `execute` method, but it feels like we need to know the actual
closures when we create the `ThreadPool`.
Lets think about what we really want to do though: we want the `Worker`
structs that we just created to fetch jobs from a queue that the `ThreadPool`
holds, and run those jobs in a thread.
In Chapter 16, we learned about channels. Channels are a great way to
communicate between two threads, and theyre perfect for this use-case. The
channel will function as the queue of jobs, and `execute` will send a job from
the `ThreadPool` to the `Worker` instances that are checking for jobs in the
thread theyve spawned. Heres the plan:
1. `ThreadPool` will create a channel and hold on to the sending side.
2. Each `Worker` will hold on to the receiving side of the channel.
3. A new `Job` struct will hold the closures we want to send down the channel.
4. The `execute` method of `ThreadPool` will send the job it wants
to execute down the sending side of the channel.
5. In a thread, the `Worker` will loop over its receiving side of the channel
and execute the closures of any jobs it receives.
Lets start by creating a channel in `ThreadPool::new` and holding the sending
side in the `ThreadPool` instance, as shown in Listing 20-16. `Job` is the type
of item were going to be sending down the channel; its a struct that doesnt
hold anything for now:
<span class="filename">Filename: src/lib.rs</span>
```rust
# use std::thread;
// --snip--
use std::sync::mpsc;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool {
workers,
sender,
}
}
// --snip--
}
#
# struct Worker {
# id: usize,
# thread: thread::JoinHandle<()>,
# }
#
# impl Worker {
# fn new(id: usize) -> Worker {
# let thread = thread::spawn(|| {});
#
# Worker {
# id,
# thread,
# }
# }
# }
```
<span class="caption">Listing 20-16: Modifying `ThreadPool` to store the
sending end of a channel that sends `Job` instances</span>
In `ThreadPool::new`, we create our new channel, and then have the pool hang on
to the sending end. This will successfully compile, still with warnings.
Lets try passing a receiving end of the channel into each worker when the
thread pool creates them. We know we want to use the receiving end of the
channel in the thread that the workers spawn, so were going to reference the
`receiver` parameter in the closure. The code shown here in Listing 20-17
wont quite compile yet:
<span class="filename">Filename: src/lib.rs</span>
```rust,ignore
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool {
workers,
sender,
}
}
// --snip--
}
// --snip--
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker {
id,
thread,
}
}
}
```
<span class="caption">Listing 20-17: Passing the receiving end of the channel
to the workers</span>
These are small and straightforward changes: we pass in the receiving end of
the channel into `Worker::new`, and then we use it inside of the closure.
If we try to check this, we get this error:
```text
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:27:42
|
27 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here in
previous iteration of loop
|
= note: move occurs because `receiver` has type
`std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
```
The code as written wont quite work since its trying to pass `receiver` to
multiple `Worker` instances. Recall from Chapter 16 that the channel
implementation provided by Rust is multiple *producer*, single *consumer*, so
we cant just clone the consuming end of the channel to fix this. We also dont
want to clone the consuming end even if we wanted to; sharing the single
`receiver` between all of the workers is the mechanism by which wed like to
distribute the jobs across the threads.
Additionally, taking a job off the channel queue involves mutating `receiver`,
so the threads need a safe way to share `receiver` and be allowed to modify it.
If the modifications werent thread-safe, we might get race conditions such as
two threads executing the same job if they both take the same job off the queue
at the same time.
So remembering the thread-safe smart pointers that we discussed in Chapter 16,
in order to share ownership across multiple threads and allow the threads to
mutate the value, we need to use `Arc<Mutex<T>>`. `Arc` will let multiple
workers own the receiver, and `Mutex` will make sure that only one worker is
getting a job from the receiver at a time. Listing 20-18 shows the changes we
need to make:
<span class="filename">Filename: src/lib.rs</span>
```rust
# use std::thread;
# use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
// --snip--
# pub struct ThreadPool {
# workers: Vec<Worker>,
# sender: mpsc::Sender<Job>,
# }
# struct Job;
#
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender,
}
}
// --snip--
}
# struct Worker {
# id: usize,
# thread: thread::JoinHandle<()>,
# }
#
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
# let thread = thread::spawn(|| {
# receiver;
# });
#
# Worker {
# id,
# thread,
# }
}
}
```
<span class="caption">Listing 20-18: Sharing the receiving end of the channel
between the workers by using `Arc` and `Mutex`</span>
In `ThreadPool::new`, we put the receiving end of the channel in an `Arc` and a
`Mutex`. For each new worker, we clone the `Arc` to bump the reference count so
the workers can share ownership of the receiving end.
With these changes, the code compiles! Were getting there!
Lets finally implement the `execute` method on `ThreadPool`. Were also going
to change the `Job` struct: instead of being a struct, `Job` is going to be a
type alias for a trait object that holds the type of closure that `execute`
receives. We discussed how type aliases can help make long types shorter, and
this is such a case! Take a look at Listing 20-19:
<span class="filename">Filename: src/lib.rs</span>
```rust
// --snip--
# pub struct ThreadPool {
# workers: Vec<Worker>,
# sender: mpsc::Sender<Job>,
# }
# use std::sync::mpsc;
# struct Worker {}
type Job = Box<FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
```
<span class="caption">Listing 20-19: Creating a `Job` type alias for a `Box`
that holds each closure, then sending the job down the channel</span>
After creating a new `Job` instance using the closure we get in
`execute`, we send that job down the sending end of the channel. Were calling
`unwrap` on `send` since sending may fail if the receiving end has stopped
receiving new messages, which would happen if we stop all of our threads from
executing. This isnt possible right now, though, since our threads continue
executing as long as the pool exists. We use `unwrap` since we know the failure
case wont happen even though the compiler cant tell that, which is an
appropriate use of `unwrap` as we discussed in Chapter 9.
Are we done yet? Not quite! In the worker, weve still got a closure being
passed to `thread::spawn` that only *references* the receiving end of the
channel. Instead, we need the closure to loop forever, asking the receiving end
of the channel for a job, and running the job when it gets one. Lets make the
change shown in Listing 20-20 to `Worker::new`:
<span class="filename">Filename: src/lib.rs</span>
```rust,ignore
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
(*job)();
}
});
Worker {
id,
thread,
}
}
}
```
<span class="caption">Listing 20-20: Receiving and executing the jobs in the
workers thread</span>
Here, we first call `lock` on the `receiver` to acquire the mutex, then
`unwrap` to panic on any errors. Acquiring a lock might fail if the mutex is in
a state called *poisoned*, which can happen if some other thread panicked while
holding the lock rather than releasing it. If this thread cant get the lock
for that reason, calling `unwrap` to have this thread panic is the correct
action to take as well. Feel free to change this `unwrap` to an `expect` with
an error message that is meaningful to you if youd like.
If we get the lock on the mutex, then we call `recv` to receive a `Job` from
the channel. A final `unwrap` moves past those errors as well. `recv` will
return `Err` if the thread holding the sending side of the channel has shut
down, similar to how the `send` method returns `Err` if the receiving side
shuts down.
The call to `recv` blocks; that is, if theres no job yet, this thread will sit
here until a job becomes available. The `Mutex<T>` makes sure that only one
`Worker` thread at a time is trying to request a job.
Theoretically, this code should compile. Unfortunately, the Rust compiler isnt
perfect yet, and we get this error:
```text
error[E0161]: cannot move a value of type std::ops::FnOnce() +
std::marker::Send: the size of std::ops::FnOnce() + std::marker::Send cannot be
statically determined
--> src/lib.rs:63:17
|
63 | (*job)();
| ^^^^^^
```
This error is fairly cryptic, and thats because the problem is fairly cryptic.
In order to call a `FnOnce` closure that is stored in a `Box<T>` (which is what
our `Job` type alias is), the closure needs to be able to move itself out of
the `Box<T>` since when we call the closure, it takes ownership of `self`. In
general, moving a value out of a `Box<T>` isnt allowed since Rust doesnt know
how big the value inside the `Box<T>` is going to be; recall in Chapter 15 that
we used `Box<T>` precisely because we had something of an unknown size that we
wanted to store in a `Box<T>` to get a value of a known size.
We saw in Chapter 17, Listing 17-15 that we can write methods that use the
syntax `self: Box<Self>` so that the method takes ownership of a `Self` value
that is stored in a `Box<T>`. Thats what we want to do here, but unfortunately
the part of Rust that implements what happens when we call a closure isnt
implemented using `self: Box<Self>`. So Rust doesnt yet understand that it
could use `self: Box<Self>` in this situation in order to take ownership of the
closure and move the closure out of the `Box<T>`.
In the future, the code in Listing 20-20 should work just fine. Rust is still a
work in progress with places that the compiler could be improved. There are
people just like you working to fix this and other issues! Once youve finished
the book, we would love for you to join in.
But for now, lets work around this problem. Luckily, theres a trick that
involves telling Rust explicitly that were in a case where we can take
ownership of the value inside the `Box<T>` using `self: Box<Self>`, and once we
have ownership of the closure, we can call it. This involves defining a new
trait that has a method `call_box` that uses `self: Box<Self>` in its
signature, defining that trait for any type that implements `FnOnce()`,
changing our type alias to use the new trait, and changing `Worker` to use the
`call_box` method. These changes are shown in Listing 20-21:
<span class="filename">Filename: src/lib.rs</span>
```rust,ignore
trait FnBox {
fn call_box(self: Box<Self>);
}
impl<F: FnOnce()> FnBox for F {
fn call_box(self: Box<F>) {
(*self)()
}
}
type Job = Box<FnBox + Send + 'static>;
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
job.call_box();
}
});
Worker {
id,
thread,
}
}
}
```
<span class="caption">Listing 20-21: Adding a new trait `FnBox` to work around
the current limitations of `Box<FnOnce()>`</span>
First, we create a new trait named `FnBox`. This trait has one method,
`call_box`, similar to the `call` methods on the other `Fn*` traits, except
this method takes `self: Box<Self>` in order to take ownership of `self` and
move the value out of the `Box<T>`.
Next, we implement the `FnBox` trait for any type `F` that implements the
`FnOnce()` trait. Effectively, this means that any `FnOnce()` closures can use
our `call_box` method. The implementation of `call_box` uses `(*self)()` to
move the closure out of the `Box<T>` and call the closure.
Instead of `FnOnce()`, we now want our `Job` type alias to be a `Box` of
anything that implements our new trait `FnBox`. This will allow us to use
`call_box` in `Worker` when we get a `Job` value. Because we implemented the
`FnBox` trait for any `FnOnce()` closure, we dont have to change anything
about the actual values were sending down the channel.
Finally, in the closure run in the thread in `Worker::new`, we use `call_box`
instead of invoking the closure directly. Now Rust is able to understand that
what we want to do is fine.
This is a very sneaky, complicated trick. Dont worry too much if it doesnt
make perfect sense; someday, it will be completely unnecessary.
With this trick, our thread pool is in a working state! Give it a `cargo run`,
and make some requests:
```text
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never used: `workers`
--> src/lib.rs:7:5
|
7 | workers: Vec<Worker>,
| ^^^^^^^^^^^^^^^^^^^^
|
= note: #[warn(dead_code)] on by default
warning: field is never used: `id`
--> src/lib.rs:61:5
|
61 | id: usize,
| ^^^^^^^^^
|
= note: #[warn(dead_code)] on by default
warning: field is never used: `thread`
--> src/lib.rs:62:5
|
62 | thread: thread::JoinHandle<()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: #[warn(dead_code)] on by default
Finished dev [unoptimized + debuginfo] target(s) in 0.99 secs
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
```
Success! We now have a thread pool executing connections asynchronously. We
never create more than four threads, so our system wont get overloaded if the
server gets a lot of requests. If we make a request to `/sleep`, the server
will be able to serve other requests by having another thread run them.
What about those warnings, though? Dont we use the `workers`, `id`, and
`thread` fields? Well, right now, were using all three of these fields to hold
onto some data, but we dont actually *do* anything with the data once weve
set up the thread pool and started running the code that sends jobs down the
channel to the threads. If we didnt hold onto these values, though, theyd go
out of scope: for example, if we didnt return the `Vec<Worker>` value as part
of the `ThreadPool`, the vector would get cleaned up at the end of
`ThreadPool::new`.
So are these warnings wrong? In one sense yes, the warnings are wrong, since we
are using the fields to store data we need to keep around. In another sense,
no, the warnings arent wrong, and theyre telling us that weve forgotten to
do something: we never do anything to clean up our thread pool once its done
being used, we just use <span class="keystroke">ctrl-C</span> to stop the
program and let the operating system clean up after us. Lets implement a
graceful shutdown that cleans up everything weve created instead.