00001 #include <iostream>
00002 #include <assert.h>
00003 #include "string.h"
00004 #include "poller.h"
00005 #include "util_types.h"
00006 #include "SmartPtr.h"
00007 #include "SmallObj.h"
00008 #include "EmptyType.h"
00009 #include "TypeManip.h"
00010
00011 #ifndef CALLBACKS_H
00012 #define CALLBACKS_H
00013
00014 extern int errno;
00015
00016 using namespace fpoller::util_types;
00017
00018 namespace fpoller {
00019
00020 namespace writer {
00022 struct close_on_exit{
00023 void retain(int fd){
00024 close(fd);
00025 }
00026 };
00027
00029 struct keep_on_exit{
00030 void retain(int fd){}
00031 };
00032
00034 template <class T>
00035 const T* ptr(const std::vector<T> & v){
00036 return &v[0];
00037 }
00038
00040 template <class T>
00041 const T* ptr (const std::basic_string<T> & s){
00042 return s.data();
00043 }
00044
00045
00048 template <class T>
00049 class content:public Loki::SmallObject{
00050 public:
00051 typedef typename T::value_type value_type;
00052 content(T *t):t_(t){}
00053 content(Loki::SmartPtr<T> t, int size = -1):t_(t),size_(size >-1 ? size : t->size()*sizeof(value_type)){}
00054 const value_type *ptr() { return writer::ptr(*t_);}
00055 size_t size() {return size_; }
00056 private:
00057 Loki::SmartPtr<T> t_;
00058 size_t size_;
00059 };
00060
00062 template <>
00063 class content<const char>:public Loki::SmallObject{
00064 public:
00065 typedef char value_type;
00066 content(const char *pc,int size=-1)
00067 :pc_(pc),size_(size >-1 ? size : strlen(pc_)){}
00068
00069 const char *ptr() { return pc_;}
00070
00071 size_t size() { return size_; }
00072 private:
00073 const char *pc_;
00074 size_t size_;
00075 };
00076
00077 }
00078
00083 template <class T, class RetentionPolicy>
00084 class write_callback:public callback,public Loki::SmallObject, public RetentionPolicy{
00085 writer::content<T> content_;
00086 int num_sent_;
00087 public:
00088 static int count_;
00090 write_callback(T *t):content_(t),num_sent_(0){}
00092 write_callback(T *t,int size):content_(t,size),num_sent_(0){}
00094 ret_val operator()(int fd){
00095 int curr_sent = send(fd,content_.ptr()+num_sent_,content_.size()-num_sent_,0);
00096 if (curr_sent > 0){
00097 num_sent_ += curr_sent;
00098 if (num_sent_ >= static_cast<int>(content_.size())){
00099 retain(fd);
00100 return REMOVE;
00101 }else{
00102 return KEEP;
00103 }
00104 }else if (curr_sent == 0){
00105
00106 close(fd);
00107 return REMOVE;
00108 }else {
00109 if (would_block(errno)){
00110 return KEEP;
00111 }else{
00112 close(fd);
00113 return REMOVE;
00114 }
00115 }
00116 }
00117 };
00118
00119 namespace reader{
00120
00122 struct purge{
00123 static ret_val should_keep(svec &,vec::iterator, size_t &){return REMOVE;}
00124 };
00125
00127 struct next_request {
00128 ret_val should_keep(svec &full_request,vec::iterator curr_request_end, size_t &num_received){
00129 num_received -= curr_request_end - full_request->begin();
00130 full_request = new vec(++curr_request_end,full_request->end());
00131 return KEEP;
00132 }
00133 };
00134
00136 struct stderr_report{
00137 static void handle(int errnum){ std::cerr << strerror(errnum)<<std::endl; }
00138 };
00139
00141 class termination_string{
00142 private:
00143 static std::string term_seq_;
00144 public:
00145 static vec::iterator is_complete(vec &request, size_t &num_received, int curr_received){
00146 assert(term_seq_.size() > 0);
00147 vec::iterator retval = search(request.begin(),request.end(),
00148 term_seq_.begin(),term_seq_.end());
00149 if (retval == request.end())
00150 return retval;
00151 else
00152 return retval+term_seq_.size();
00153 }
00154 static void set_terminator(const std::string &s){term_seq_ = s;}
00155 static void set_terminator(const char *s){term_seq_ = s;}
00156 };
00157 std::string termination_string::term_seq_;
00158
00160 class reach_eof{
00161 public:
00162 static vec::iterator is_complete(vec &request, size_t &num_received, int curr_received){
00163 return (curr_received == 0) ? request.begin() : request.end();
00164 }
00165 };
00166
00168 struct socket_read{
00169 unsigned dummy;
00170 int read(int fd, vec &request, int num_received){
00171 return recvfrom(fd,&request[num_received],request.size()-num_received,0, NULL, &dummy);
00172 }
00173 };
00174
00176 struct file_read{
00177 static int read(int fd, vec &request, int num_received){
00178 return ::read(fd,&request[num_received],request.size()-num_received);
00179 }
00180 };
00181 }
00182
00195 template<class Parser,
00196 class ReadingPolicy = reader::socket_read,
00197 class KeepAlivePolicy = reader::purge,
00198 class ErrorHandlerPolicy = reader::stderr_report,
00199 class ReadCompletionPolicy = reader::termination_string,
00200 class Poller = poller>
00201 class read_callback:public fpoller::callback,public Loki::SmallObject,public Parser, public KeepAlivePolicy,
00202 public ReadingPolicy,public ErrorHandlerPolicy, public ReadCompletionPolicy{
00203 unsigned int dummy;
00204 size_t block_size_;
00205 svec request_;
00206 size_t num_received_;
00207 public:
00208 read_callback(size_t block_size=4096):block_size_(block_size),request_(new vec(block_size)),num_received_(0){}
00209 ret_val operator()(int fd){
00210 if (num_received_+block_size_ > request_->size()) request_->resize(request_->size()+block_size_);
00211 int curr_received = read(fd, (*request_), num_received_);
00212 if (curr_received < 0 ) {
00213 if (would_block(errno)){
00214 return KEEP;
00215 }else{
00216 handle(errno);
00217 close(fd);
00218 return REMOVE;
00219 }
00220
00221
00222 }else{
00223 num_received_ += curr_received;
00224 vec::iterator request_end;
00225 if ((request_end = is_complete((*request_),num_received_,curr_received)) != request_->end()){
00226 parse(fd,request_,num_received_);
00227 return should_keep(request_,request_end,num_received_);
00228 }
00229 return KEEP;
00230 }
00231 }
00232 };
00233
00234 namespace accepter{
00236 struct discard_peer{
00237 void peer(const sockaddr_in & sa, int fd){}
00238 };
00239 }
00240
00246 template <typename CallBack, typename Poller=poller, typename PeerPolicy = accepter::discard_peer>
00247 class accept_callback : public callback, protected PeerPolicy{
00248 Poller *poller_;
00249 short interest_;
00250 public:
00251 accept_callback(Poller *p, int i = READ):poller_(p),interest_(i){}
00254 ret_val operator()(int fd) {
00255 for(;;){
00256 sockaddr_in addr;
00257 socklen_t len = sizeof(sockaddr_in);;
00258 int conn_fd = accept(fd,(sockaddr *)&addr, &len);
00259 if (conn_fd < 0){
00260 if (would_block(errno)){
00261 break;
00262 }else{
00263 throw new exception ("unexpected error accepting",errno);
00264 }
00265 }else{
00266
00267 set_non_blocking(conn_fd);
00268 poller_->add(conn_fd, interest_, new CallBack());
00269 peer(addr,conn_fd);
00270 }
00271 }
00272
00273 return KEEP;
00274 }
00275
00276
00277 };
00278
00279
00280 }
00281
00282 #endif //CALLBACKS_H