From 2c84f7ebbc96a861096266087a868ba07c66bbce Mon Sep 17 00:00:00 2001 From: Yaser Hsueh Date: Sun, 8 Jan 2023 00:38:58 +0800 Subject: [PATCH] feature: multi-threads support --- src/{ => bin}/main.rs | 34 ++++++++++------- src/lib.rs | 89 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 13 deletions(-) rename src/{ => bin}/main.rs (69%) create mode 100644 src/lib.rs diff --git a/src/main.rs b/src/bin/main.rs similarity index 69% rename from src/main.rs rename to src/bin/main.rs index 8e84b8b..33b0938 100644 --- a/src/main.rs +++ b/src/bin/main.rs @@ -4,6 +4,8 @@ use clap::{App, Arg}; use std::io::prelude::*; use std::net::{TcpListener, TcpStream}; +use echo_ip::ThreadPool; + fn main() { let matches = App::new("EchoIP") .version("1.0") @@ -32,6 +34,7 @@ fn main() { let bind_address = host.to_owned() + ":" + port; let listener = TcpListener::bind(&bind_address); + let pool = ThreadPool::new(4); match listener { Ok(n) => { @@ -40,25 +43,30 @@ fn main() { println!("-----------------------------------------------------------"); for stream in n.incoming() { let stream = stream.unwrap(); - handle_connection(stream); + //thread::spawn(|| { + pool.execute(|| { + handle_connection(stream); + }); } - }, + } Err(_e) => println!("Address already in use: {}", bind_address), } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 1024]; - let remote_addr = stream.peer_addr().unwrap().ip().to_string(); - - stream.read(&mut buffer).unwrap(); - - let response = "HTTP/1.1 200 OK\r\n\r\n"; - - stream.write(response.as_bytes()).unwrap(); - stream.write(remote_addr.as_bytes()).unwrap(); - stream.write("\r\n".as_bytes()).unwrap(); - stream.flush().unwrap(); - + let remote_addr = stream.peer_addr().unwrap().ip(); + println!("{}", stream.read(&mut buffer).unwrap()); + println!("{}", String::from_utf8_lossy(&buffer[..])); + //stream.read(&mut buffer).unwrap(); + let get = b"GET / HTTP/1.1\r\n"; + let (status_line, contents) = if buffer.starts_with(get) { + ("HTTP/1.1 200 OK", remote_addr.to_string()) + } else { + ("HTTP/1.1 404 NOT FOUND", "URL ERROR".to_string()) + }; println!("echo {}", remote_addr); + let response = format!("{}\r\n\r\n{}\r\n", status_line, contents); + stream.write(response.as_bytes()).unwrap(); + stream.flush().unwrap(); } diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..8afcb6b --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,89 @@ +use std::sync::mpsc; +use std::sync::Arc; +use std::sync::Mutex; +use std::thread; + +enum Message { + NewJob(Job), + Terminate, +} + +pub struct ThreadPool { + workers: Vec, + sender: mpsc::Sender, +} + +//struct Job; +type Job = Box; + +impl ThreadPool { + pub fn new(size: usize) -> ThreadPool { + assert!(size > 0); + + let (sender, receiver) = mpsc::channel(); + let receiver = Arc::new(Mutex::new(receiver)); + let mut workers = Vec::with_capacity(size); + + for id in 0..size { + workers.push(Worker::new(id, Arc::clone(&receiver))) + } + ThreadPool { workers, sender } + } + + pub fn execute(&self, f: F) + where + F: FnOnce() + Send + 'static, + { + let job = Box::new(f); + self.sender.send(Message::NewJob(job)).unwrap(); + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + println!("Sending terminate message to all wokers."); + + for _ in &mut self.workers { + self.sender.send(Message::Terminate).unwrap(); + } + + println!("Shutting down all workers."); + + for worker in &mut self.workers { + println!("shutdown down work {}", worker.id); + if let Some(thread) = worker.thread.take() { + thread.join().unwrap(); + } + } + } +} + +pub struct Worker { + id: usize, + thread: Option>, +} + +impl Worker { + fn new(id: usize, receiver: Arc>>) -> Worker { + let thread = thread::spawn(move || { + //receiver; + loop { + let message = receiver.lock().unwrap().recv().unwrap(); + match message { + Message::NewJob(job) => { + println!("worker {} got a job; executing.", id); + job(); + } + Message::Terminate => { + println!("worker {} was told to terminate.", id); + break; + } + } + } + }); + Worker { + id, + thread: Some(thread), + } + } +}