Main Page   Namespace List   Class Hierarchy   Compound List   File List   Namespace Members   Compound Members   Related Pages   Examples  

callbacks.h

00001 #include <iostream>
00002 #include <assert.h>
00003 #include "string.h"
00004 #include "poller.h"
00005 #include "util_types.h"
00006 #include "SmartPtr.h"
00007 #include "SmallObj.h"
00008 #include "EmptyType.h"
00009 #include "TypeManip.h"
00010 
00011 #ifndef CALLBACKS_H
00012 #define CALLBACKS_H
00013 
00014 extern int errno;
00015 
00016 using namespace fpoller::util_types;
00017 
00018 namespace fpoller {
00019 
00020   namespace writer {
00022     struct close_on_exit{
00023       void retain(int fd){
00024         close(fd);
00025       }
00026     }; 
00027 
00029     struct keep_on_exit{
00030       void retain(int fd){}
00031     }; 
00032 
00034     template <class T>
00035     const T* ptr(const std::vector<T> & v){
00036       return &v[0];
00037     }
00038     
00040     template <class T>
00041     const T* ptr (const std::basic_string<T> & s){
00042       return s.data();
00043     }
00044     
00045 
00048     template <class T>
00049     class content:public Loki::SmallObject{
00050     public:
00051       typedef typename T::value_type value_type;
00052       content(T *t):t_(t){}
00053       content(Loki::SmartPtr<T> t, int size = -1):t_(t),size_(size >-1 ? size : t->size()*sizeof(value_type)){}
00054       const value_type *ptr() { return writer::ptr(*t_);}
00055       size_t size() {return size_; }
00056     private:
00057       Loki::SmartPtr<T> t_;
00058       size_t size_;
00059     };
00060 
00062     template <>
00063     class content<const char>:public Loki::SmallObject{
00064     public:
00065       typedef char value_type;
00066       content(const char *pc,int size=-1)
00067         :pc_(pc),size_(size >-1 ? size : strlen(pc_)){}
00068 
00069       const char *ptr() { return pc_;}
00070 
00071       size_t size() { return size_; }
00072     private:
00073       const char *pc_;
00074       size_t size_;
00075     };
00076 
00077   }
00078 
00083   template <class T, class RetentionPolicy>
00084   class write_callback:public callback,public Loki::SmallObject, public RetentionPolicy{
00085     writer::content<T> content_;
00086     int num_sent_;
00087   public:
00088     static int count_;
00090     write_callback(T *t):content_(t),num_sent_(0){}
00092     write_callback(T *t,int size):content_(t,size),num_sent_(0){}
00094     ret_val operator()(int fd){
00095       int curr_sent = send(fd,content_.ptr()+num_sent_,content_.size()-num_sent_,0);
00096       if (curr_sent > 0){
00097         num_sent_ += curr_sent;
00098         if (num_sent_ >= static_cast<int>(content_.size())){
00099           retain(fd);
00100           return REMOVE;
00101         }else{
00102           return KEEP;
00103         }
00104       }else if (curr_sent == 0){
00105         //socket closed
00106         close(fd);
00107         return REMOVE;
00108       }else { //(curr_sent < 0)
00109         if (would_block(errno)){
00110           return KEEP;
00111         }else{
00112           close(fd);
00113           return REMOVE;
00114         }
00115       }
00116     }//operator()
00117   };
00118 
00119   namespace reader{
00120   
00122     struct purge{
00123       static ret_val should_keep(svec &,vec::iterator, size_t &){return REMOVE;}
00124     };
00125 
00127     struct next_request {
00128       ret_val should_keep(svec &full_request,vec::iterator curr_request_end, size_t &num_received){
00129         num_received -= curr_request_end - full_request->begin();
00130         full_request = new vec(++curr_request_end,full_request->end());
00131         return KEEP;
00132       }
00133     };
00134     
00136     struct stderr_report{
00137       static void handle(int errnum){ std::cerr << strerror(errnum)<<std::endl; }
00138     };
00139 
00141     class termination_string{
00142     private:
00143       static std::string term_seq_;
00144     public:
00145       static vec::iterator is_complete(vec &request, size_t &num_received, int curr_received){
00146         assert(term_seq_.size() > 0);
00147         vec::iterator retval =  search(request.begin(),request.end(),
00148                                                term_seq_.begin(),term_seq_.end());
00149         if (retval == request.end())
00150           return retval;
00151         else
00152           return retval+term_seq_.size();
00153       }
00154       static void set_terminator(const std::string &s){term_seq_ = s;}
00155       static void set_terminator(const char *s){term_seq_ = s;}
00156     };
00157     std::string termination_string::term_seq_;
00158 
00160     class reach_eof{
00161     public:
00162       static vec::iterator is_complete(vec &request, size_t &num_received, int curr_received){
00163         return (curr_received == 0) ? request.begin() : request.end();
00164       }
00165     };
00166 
00168     struct socket_read{
00169       unsigned dummy;
00170       int read(int fd, vec &request, int num_received){
00171         return recvfrom(fd,&request[num_received],request.size()-num_received,0, NULL, &dummy);
00172       }
00173     };
00174 
00176     struct file_read{
00177       static int read(int fd, vec &request, int num_received){
00178         return ::read(fd,&request[num_received],request.size()-num_received);
00179       }
00180     };
00181   }//namespace reader
00182 
00195   template<class Parser,
00196     class ReadingPolicy = reader::socket_read,
00197     class KeepAlivePolicy = reader::purge, 
00198     class ErrorHandlerPolicy = reader::stderr_report,
00199     class ReadCompletionPolicy = reader::termination_string,
00200     class Poller = poller>
00201   class read_callback:public fpoller::callback,public Loki::SmallObject,public Parser, public KeepAlivePolicy, 
00202                       public ReadingPolicy,public ErrorHandlerPolicy, public ReadCompletionPolicy{
00203     unsigned int dummy;
00204     size_t block_size_;
00205     svec request_;
00206     size_t num_received_;
00207   public:
00208     read_callback(size_t block_size=4096):block_size_(block_size),request_(new vec(block_size)),num_received_(0){}
00209     ret_val operator()(int fd){
00210       if (num_received_+block_size_ > request_->size()) request_->resize(request_->size()+block_size_);
00211       int curr_received = read(fd, (*request_), num_received_);
00212       if (curr_received < 0 ) {
00213         if (would_block(errno)){
00214           return KEEP;
00215         }else{
00216           handle(errno);
00217           close(fd);
00218           return REMOVE;
00219         }
00220         //      }else if (curr_received == 0){
00221         //return REMOVE; //EOF on channel
00222       }else{ //curr_received > 0
00223         num_received_ += curr_received;
00224         vec::iterator request_end;
00225         if ((request_end = is_complete((*request_),num_received_,curr_received)) != request_->end()){
00226           parse(fd,request_,num_received_);
00227           return should_keep(request_,request_end,num_received_);
00228         }
00229         return KEEP;
00230       }
00231     }//operator()
00232   };
00233 
00234   namespace accepter{
00236     struct discard_peer{
00237       void peer(const sockaddr_in & sa, int fd){}
00238     };
00239   }//namespace accepter
00240 
00246   template <typename CallBack, typename Poller=poller, typename PeerPolicy = accepter::discard_peer>
00247     class accept_callback : public callback, protected PeerPolicy{
00248       Poller *poller_;
00249       short interest_;
00250     public:
00251       accept_callback(Poller *p, int i = READ):poller_(p),interest_(i){}
00254       ret_val operator()(int fd) {
00255         for(;;){
00256           sockaddr_in addr;
00257           socklen_t len = sizeof(sockaddr_in);;
00258           int conn_fd = accept(fd,(sockaddr *)&addr, &len);
00259           if (conn_fd < 0){
00260             if (would_block(errno)){
00261               break;
00262             }else{
00263               throw new exception ("unexpected error accepting",errno);
00264             }
00265           }else{
00266             //we have a bona fide, accepted connection
00267             set_non_blocking(conn_fd);
00268             poller_->add(conn_fd, interest_, new CallBack());
00269             peer(addr,conn_fd);
00270           }
00271         }
00272         //we don't want to stop accepting
00273         return KEEP;
00274       }
00275 
00276 
00277     };
00278 
00279 
00280 }//namespace fpoller
00281 
00282 #endif //CALLBACKS_H

Generated at Wed Oct 16 16:02:39 2002 for fpoller by doxygen1.2.9.1 written by Dimitri van Heesch, © 1997-2001