1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use std::collections::{VecDeque};
use std::net::{SocketAddr};

use mio::{Handler, EventLoop, Token, EventSet, PollOpt};
use mio::udp::{UdpSocket};

use buffer::{BufferPool, Buffer};
use provider::{self, Provider};

/// Handles events occurring within the event loop.
pub trait Dispatcher: Sized {
    type Timeout;
    type Message: Send;
    
    /// Process an incoming message from the given address.
    #[allow(unused)]
    fn incoming<'a>(&mut self, provider: Provider<'a, Self>, message: &[u8], addr: SocketAddr) { }
    
    /// Process a message sent via the event loop channel.
    #[allow(unused)]
    fn notify<'a>(&mut self, provider: Provider<'a, Self>, message: Self::Message) { }
    
    /// Process a timeout that has been triggered.
    #[allow(unused)]
    fn timeout<'a>(&mut self, provider: Provider<'a, Self>, timeout: Self::Timeout) { }
}

//----------------------------------------------------------------------------//

const UDP_SOCKET_TOKEN: Token = Token(2);

pub struct DispatchHandler<D: Dispatcher> {
    dispatch:    D,
    out_queue:   VecDeque<(Buffer, SocketAddr)>,
    udp_socket:  UdpSocket,
    buffer_pool: BufferPool,
    current_set: EventSet
}

impl<D: Dispatcher> DispatchHandler<D> {
    pub fn new(udp_socket: UdpSocket, buffer_size: usize, dispatch: D, event_loop: &mut EventLoop<DispatchHandler<D>>)
        -> DispatchHandler<D> {
        let buffer_pool = BufferPool::new(buffer_size);
        let out_queue = VecDeque::new();
        
        event_loop.register(&udp_socket, UDP_SOCKET_TOKEN, EventSet::readable(), PollOpt::edge()).unwrap();
        
        DispatchHandler{ dispatch: dispatch, out_queue: out_queue, udp_socket: udp_socket,
            buffer_pool: buffer_pool, current_set: EventSet::readable() }
    } 
    
    pub fn handle_write(&mut self) {
        match self.out_queue.pop_front() {
            Some((buffer, addr)) => {
                self.udp_socket.send_to(buffer.as_ref(), &addr).unwrap();
                
                self.buffer_pool.push(buffer);
            },
            None => ()
        };
    }
    
    pub fn handle_read(&mut self) -> Option<(Buffer, SocketAddr)> {
        let mut buffer = self.buffer_pool.pop();
        
        if let Ok(Some((bytes, addr))) = self.udp_socket.recv_from(buffer.as_mut()) {
            buffer.set_written(bytes);
            
            Some((buffer, addr))
        } else {
            None
        }
    }
}

impl<D: Dispatcher> Handler for DispatchHandler<D> {
    type Timeout = D::Timeout;
    type Message = D::Message;
    
    fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
        if token != UDP_SOCKET_TOKEN {
            return
        }
        
        if events.is_writable() {
            self.handle_write();
        }
        
        if events.is_readable() {
            let (buffer, addr) = if let Some((buffer, addr)) = self.handle_read() {
                (buffer, addr)
            } else { return };
            
            {
                let provider = provider::new(&mut self.buffer_pool, &mut self.out_queue, event_loop);
                
                self.dispatch.incoming(provider, buffer.as_ref(), addr);
            }
            
            self.buffer_pool.push(buffer);
        }
    }
    
    fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {
        let provider = provider::new(&mut self.buffer_pool, &mut self.out_queue, event_loop);
    
        self.dispatch.notify(provider, msg);
    }
    
    fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) {
        let provider = provider::new(&mut self.buffer_pool, &mut self.out_queue, event_loop);
        
        self.dispatch.timeout(provider, timeout);
    }
    
    fn tick(&mut self, event_loop: &mut EventLoop<Self>) {
        self.current_set = if !self.out_queue.is_empty() {
            EventSet::readable() | EventSet::writable()
        } else {
            EventSet::readable()
        };
        
        event_loop.reregister(&self.udp_socket, UDP_SOCKET_TOKEN, self.current_set, PollOpt::edge()).unwrap();
    }
}