From 9c41fa761ff85deee20bba769bfc59911e845637 Mon Sep 17 00:00:00 2001 From: Yaser Hsueh Date: Sun, 8 Jan 2023 02:41:06 +0800 Subject: [PATCH] feat: shutdown graceful --- Cargo.toml | 1 + src/lib.rs | 12 ++------ src/{bin => }/main.rs | 72 ++++++++++++++++++++++++++----------------- 3 files changed, 47 insertions(+), 38 deletions(-) rename src/{bin => }/main.rs (70%) diff --git a/Cargo.toml b/Cargo.toml index 853caad..09bbf39 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,3 +7,4 @@ edition = "2021" [dependencies] clap = "2.33.1" +ctrlc = {version = "3.2.4", features=["termination"]} diff --git a/src/lib.rs b/src/lib.rs index 8afcb6b..91d00de 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ +use std::thread; use std::sync::mpsc; use std::sync::Arc; use std::sync::Mutex; -use std::thread; enum Message { NewJob(Job), @@ -13,13 +13,11 @@ pub struct ThreadPool { sender: mpsc::Sender, } -//struct Job; type Job = Box; impl ThreadPool { pub fn new(size: usize) -> ThreadPool { assert!(size > 0); - let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); @@ -29,7 +27,6 @@ impl ThreadPool { } ThreadPool { workers, sender } } - pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static, @@ -42,13 +39,10 @@ impl ThreadPool { impl Drop for ThreadPool { fn drop(&mut self) { println!("Sending terminate message to all wokers."); - for _ in &mut self.workers { self.sender.send(Message::Terminate).unwrap(); } - println!("Shutting down all workers."); - for worker in &mut self.workers { println!("shutdown down work {}", worker.id); if let Some(thread) = worker.thread.take() { @@ -71,11 +65,11 @@ impl Worker { let message = receiver.lock().unwrap().recv().unwrap(); match message { Message::NewJob(job) => { - println!("worker {} got a job; executing.", id); + //println!("worker {} got a job; executing.", id); job(); } Message::Terminate => { - println!("worker {} was told to terminate.", id); + //println!("worker {} was told to terminate.", id); break; } } diff --git a/src/bin/main.rs b/src/main.rs similarity index 70% rename from src/bin/main.rs rename to src/main.rs index 33b0938..bbd9cc4 100644 --- a/src/bin/main.rs +++ b/src/main.rs @@ -1,13 +1,51 @@ extern crate clap; -use clap::{App, Arg}; - +use clap::{App, Arg, ArgMatches}; use std::io::prelude::*; use std::net::{TcpListener, TcpStream}; +use std::process; +use std::sync::mpsc::channel; +use std::thread; +use ctrlc; use echo_ip::ThreadPool; fn main() { - let matches = App::new("EchoIP") + let (tx, rx) = channel(); + ctrlc::set_handler(move || tx.send(()).expect("Could not send signal on channel.")) + .expect("Error setting Ctrl+C handler"); + thread::spawn(move || { + let matches = handle_args(); + let host = matches.value_of("server").unwrap_or("0.0.0.0"); + let port = matches.value_of("port").unwrap_or("8888"); + create_listener(host, port); + }); + rx.recv().expect("Could not receive from channel."); + println!("Exiting..."); + process::exit(0); +} + +fn create_listener(host: &str, port: &str) { + let bind_address = host.to_owned() + ":" + port; + let listener = TcpListener::bind(&bind_address); + let pool = ThreadPool::new(4); + match listener { + Ok(n) => { + println!("-----------------------------------------------------------"); + println!("Server listening on {}", &port); + println!("-----------------------------------------------------------"); + for stream in n.incoming() { + let stream = stream.unwrap(); + pool.execute(|| { + handle_connection(stream); + }); + } + } + Err(_e) => println!("Address already in use: {}", bind_address), + } +} + +fn handle_args() -> ArgMatches<'static> { + return App::new("echo-ip") .version("1.0") .author("Yaser Hsueh ") .about("https://www.simaek.com") @@ -28,37 +66,13 @@ fn main() { .takes_value(true), ) .get_matches(); - - let host = matches.value_of("server").unwrap_or("0.0.0.0"); - let port = matches.value_of("port").unwrap_or("8888"); - let bind_address = host.to_owned() + ":" + port; - - let listener = TcpListener::bind(&bind_address); - let pool = ThreadPool::new(4); - - match listener { - Ok(n) => { - println!("-----------------------------------------------------------"); - println!("Server listening on {}", &port); - println!("-----------------------------------------------------------"); - for stream in n.incoming() { - let stream = stream.unwrap(); - //thread::spawn(|| { - pool.execute(|| { - handle_connection(stream); - }); - } - } - Err(_e) => println!("Address already in use: {}", bind_address), - } } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 1024]; let remote_addr = stream.peer_addr().unwrap().ip(); - println!("{}", stream.read(&mut buffer).unwrap()); - println!("{}", String::from_utf8_lossy(&buffer[..])); - //stream.read(&mut buffer).unwrap(); + //println!("{}", String::from_utf8_lossy(&buffer[..])); + stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; let (status_line, contents) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK", remote_addr.to_string())