1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use std::collections::{VecDeque};
use std::net::{SocketAddr};
use mio::{Handler, EventLoop, Token, EventSet, PollOpt};
use mio::udp::{UdpSocket};
use buffer::{BufferPool, Buffer};
use provider::{self, Provider};
pub trait Dispatcher: Sized {
type Timeout;
type Message: Send;
#[allow(unused)]
fn incoming<'a>(&mut self, provider: Provider<'a, Self>, message: &[u8], addr: SocketAddr) { }
#[allow(unused)]
fn notify<'a>(&mut self, provider: Provider<'a, Self>, message: Self::Message) { }
#[allow(unused)]
fn timeout<'a>(&mut self, provider: Provider<'a, Self>, timeout: Self::Timeout) { }
}
const UDP_SOCKET_TOKEN: Token = Token(2);
pub struct DispatchHandler<D: Dispatcher> {
dispatch: D,
out_queue: VecDeque<(Buffer, SocketAddr)>,
udp_socket: UdpSocket,
buffer_pool: BufferPool,
current_set: EventSet
}
impl<D: Dispatcher> DispatchHandler<D> {
pub fn new(udp_socket: UdpSocket, buffer_size: usize, dispatch: D, event_loop: &mut EventLoop<DispatchHandler<D>>)
-> DispatchHandler<D> {
let buffer_pool = BufferPool::new(buffer_size);
let out_queue = VecDeque::new();
event_loop.register(&udp_socket, UDP_SOCKET_TOKEN, EventSet::readable(), PollOpt::edge()).unwrap();
DispatchHandler{ dispatch: dispatch, out_queue: out_queue, udp_socket: udp_socket,
buffer_pool: buffer_pool, current_set: EventSet::readable() }
}
pub fn handle_write(&mut self) {
match self.out_queue.pop_front() {
Some((buffer, addr)) => {
self.udp_socket.send_to(buffer.as_ref(), &addr).unwrap();
self.buffer_pool.push(buffer);
},
None => ()
};
}
pub fn handle_read(&mut self) -> Option<(Buffer, SocketAddr)> {
let mut buffer = self.buffer_pool.pop();
if let Ok(Some((bytes, addr))) = self.udp_socket.recv_from(buffer.as_mut()) {
buffer.set_written(bytes);
Some((buffer, addr))
} else {
None
}
}
}
impl<D: Dispatcher> Handler for DispatchHandler<D> {
type Timeout = D::Timeout;
type Message = D::Message;
fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
if token != UDP_SOCKET_TOKEN {
return
}
if events.is_writable() {
self.handle_write();
}
if events.is_readable() {
let (buffer, addr) = if let Some((buffer, addr)) = self.handle_read() {
(buffer, addr)
} else { return };
{
let provider = provider::new(&mut self.buffer_pool, &mut self.out_queue, event_loop);
self.dispatch.incoming(provider, buffer.as_ref(), addr);
}
self.buffer_pool.push(buffer);
}
}
fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {
let provider = provider::new(&mut self.buffer_pool, &mut self.out_queue, event_loop);
self.dispatch.notify(provider, msg);
}
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) {
let provider = provider::new(&mut self.buffer_pool, &mut self.out_queue, event_loop);
self.dispatch.timeout(provider, timeout);
}
fn tick(&mut self, event_loop: &mut EventLoop<Self>) {
self.current_set = if !self.out_queue.is_empty() {
EventSet::readable() | EventSet::writable()
} else {
EventSet::readable()
};
event_loop.reregister(&self.udp_socket, UDP_SOCKET_TOKEN, self.current_set, PollOpt::edge()).unwrap();
}
}