|
UniSet
2.6.0
|
00001 /* 00002 * Copyright (c) 2015 Pavel Vainerman. 00003 * 00004 * This program is free software: you can redistribute it and/or modify 00005 * it under the terms of the GNU Lesser General Public License as 00006 * published by the Free Software Foundation, version 2.1. 00007 * 00008 * This program is distributed in the hope that it will be useful, but 00009 * WITHOUT ANY WARRANTY; without even the implied warranty of 00010 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00011 * Lesser General Lesser Public License for more details. 00012 * 00013 * You should have received a copy of the GNU Lesser General Public License 00014 * along with this program. If not, see <http://www.gnu.org/licenses/>. 00015 */ 00016 // ----------------------------------------------------------------------------- 00017 #ifndef UNetReceiver_H_ 00018 #define UNetReceiver_H_ 00019 // ----------------------------------------------------------------------------- 00020 #include <ostream> 00021 #include <memory> 00022 #include <string> 00023 #include <queue> 00024 #include <unordered_map> 00025 #include <sigc++/sigc++.h> 00026 #include <ev++.h> 00027 #include "UniSetObject.h" 00028 #include "Trigger.h" 00029 #include "Mutex.h" 00030 #include "SMInterface.h" 00031 #include "SharedMemory.h" 00032 #include "UDPPacket.h" 00033 #include "CommonEventLoop.h" 00034 #include "UDPCore.h" 00035 // -------------------------------------------------------------------------- 00036 namespace uniset 00037 { 00038 // ----------------------------------------------------------------------------- 00039 /* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP. 00040 * =============== 00041 * Собственно реализация сделана так: 00042 * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности 00043 * что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета 00044 * (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд", 00045 * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout) 00046 * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше.. 00047 * Всё это реализовано в функции UNetReceiver::real_update() 00048 * 00049 * КЭШ 00050 * === 00051 * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов. 00052 * Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо). 00053 * Порядковый номер данных в пакете является индексом в кэше. 00054 * Для защиты от изменения последовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем, 00055 * ID который пришёл в пакете - элемент кэша обновляется. 00056 * Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется. 00057 * 00058 * КЭШ (ДОПОЛНЕНИЕ) 00059 * === 00060 * Т.к. в общем случае, данные могут быть разбиты не несколько (много) пакетов, то для каждого из них выделен свой кэш и создан отдельный 00061 * map, ключом в котором является идентификатор данных (см. UDPMessage::getDataID()). 00062 * Кэш в map добавляется когда приходит пакет с новым UDPMessage::getDataID() и в дальнейшим используется для этого пакета. 00063 * В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и расчитан на статичность пакетов, 00064 * т.е. на то что UNetSender не будет с течением времени менять структуру отправляемых пакетов. 00065 * 00066 * Обработка сбоя или переполнения счётчика пакетов(перехода через максимум) 00067 * ========================================================================= 00068 * Для защиты от сбоя счётика сделана следующая логика: 00069 * Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается, 00070 * что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью. 00071 * При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет, 00072 * в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка. 00073 * Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься 00074 * и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные 00075 * затирают старые, если их не успели вынуть и обработать. 00076 * \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди. 00077 * ========================================================================= 00078 * ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем.. 00079 * 00080 * Создание соединения (открытие сокета) 00081 * ====================================== 00082 * Попытка создать сокет производиться сразу в конструкторе, если это не получается, 00083 * то создаётся таймер (evCheckConnection), который периодически (checkConnectionTime) пытается вновь 00084 * открыть сокет.. и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы 00085 * (в момент создания объекта UNetReceiver) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется 00086 * ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть). 00087 * Если такая логика не требуется, то можно задать в конструкторе 00088 * последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет 00089 * выкинуто исключение при неудачной попытке создания соединения. 00090 * 00091 * Стратегия обновления данных в SM 00092 * ================================== 00093 * При помощи функции setUpdateStrategy() можно выбрать стратегию обновления данных в SM. 00094 * Поддерживается два варианта: 00095 * 'thread' - отдельный поток обновления 00096 * 'evloop' - использование общего с приёмом event loop (libev) 00097 */ 00098 // ----------------------------------------------------------------------------- 00099 class UNetReceiver: 00100 protected EvWatcher, 00101 public std::enable_shared_from_this<UNetReceiver> 00102 { 00103 public: 00104 UNetReceiver( const std::string& host, int port, const std::shared_ptr<SMInterface>& smi, bool nocheckConnection = false ); 00105 virtual ~UNetReceiver(); 00106 00107 void start(); 00108 void stop(); 00109 00110 inline const std::string getName() const 00111 { 00112 return myname; 00113 } 00114 00115 // блокировать сохранение данных в SM 00116 void setLockUpdate( bool st ) noexcept; 00117 inline bool isLockUpdate() const noexcept 00118 { 00119 return lockUpdate; 00120 } 00121 00122 void resetTimeout() noexcept; 00123 00124 inline bool isRecvOK() const noexcept 00125 { 00126 return !ptRecvTimeout.checkTime(); 00127 } 00128 inline size_t getLostPacketsNum() const noexcept 00129 { 00130 return lostPackets; 00131 } 00132 00133 void setReceiveTimeout( timeout_t msec ) noexcept; 00134 void setReceivePause( timeout_t msec ) noexcept; 00135 void setUpdatePause( timeout_t msec ) noexcept; 00136 void setLostTimeout( timeout_t msec ) noexcept; 00137 void setPrepareTime( timeout_t msec ) noexcept; 00138 void setCheckConnectionPause( timeout_t msec ) noexcept; 00139 void setMaxDifferens( unsigned long set ) noexcept; 00140 00141 void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept; 00142 void setLostPacketsID( uniset::ObjectId id ) noexcept; 00143 00144 void setMaxProcessingCount( int set ) noexcept; 00145 00146 void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись 00147 00148 inline std::string getAddress() const noexcept 00149 { 00150 return addr; 00151 } 00152 inline int getPort() const noexcept 00153 { 00154 return port; 00155 } 00156 00158 enum Event 00159 { 00160 evOK, 00161 evTimeout 00162 }; 00163 00164 typedef sigc::slot<void, const std::shared_ptr<UNetReceiver>&, Event> EventSlot; 00165 void connectEvent( EventSlot sl ) noexcept; 00166 00167 // -------------------------------------------------------------------- 00169 enum UpdateStrategy 00170 { 00171 useUpdateUnknown, 00172 useUpdateThread, 00173 useUpdateEventLoop 00174 }; 00175 00176 static UpdateStrategy strToUpdateStrategy( const std::string& s ) noexcept; 00177 static std::string to_string( UpdateStrategy s) noexcept; 00178 00180 void setUpdateStrategy( UpdateStrategy set ); 00181 00182 // специальная обёртка, захватывающая или нет mutex в зависимости от стратегии 00183 // (т.к. при evloop mutex захватытвать не нужно) 00184 class pack_guard 00185 { 00186 public: 00187 pack_guard( std::mutex& m, UpdateStrategy s ); 00188 ~pack_guard(); 00189 00190 protected: 00191 std::mutex& m; 00192 UpdateStrategy s; 00193 }; 00194 00195 // -------------------------------------------------------------------- 00196 00197 inline std::shared_ptr<DebugStream> getLog() 00198 { 00199 return unetlog; 00200 } 00201 00202 virtual const std::string getShortInfo() const noexcept; 00203 00204 protected: 00205 00206 const std::shared_ptr<SMInterface> shm; 00207 std::shared_ptr<DebugStream> unetlog; 00208 00209 bool receive() noexcept; 00210 void step() noexcept; 00211 void update() noexcept; 00212 void updateThread() noexcept; 00213 void callback( ev::io& watcher, int revents ) noexcept; 00214 void readEvent( ev::io& watcher ) noexcept; 00215 void updateEvent( ev::periodic& watcher, int revents ) noexcept; 00216 void checkConnectionEvent( ev::periodic& watcher, int revents ) noexcept; 00217 void statisticsEvent( ev::periodic& watcher, int revents ) noexcept; 00218 virtual void evprepare( const ev::loop_ref& eloop ) noexcept override; 00219 virtual void evfinish(const ev::loop_ref& eloop ) noexcept override; 00220 virtual std::string wname() const noexcept override 00221 { 00222 return myname; 00223 } 00224 00225 void initIterators() noexcept; 00226 bool createConnection( bool throwEx = false ); 00227 void checkConnection(); 00228 00229 public: 00230 00231 // функция определения приоритетного сообщения для обработки 00232 struct PacketCompare: 00233 public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool> 00234 { 00235 inline bool operator()(const UniSetUDP::UDPMessage& lhs, 00236 const UniSetUDP::UDPMessage& rhs) const 00237 { 00238 return lhs.num > rhs.num; 00239 } 00240 }; 00241 00242 typedef std::priority_queue<UniSetUDP::UDPMessage, std::vector<UniSetUDP::UDPMessage>, PacketCompare> PacketQueue; 00243 00244 private: 00245 UNetReceiver(); 00246 00247 timeout_t recvpause = { 10 }; 00248 timeout_t updatepause = { 100 }; 00250 std::shared_ptr<UDPReceiveU> udp; 00251 std::string addr; 00252 int port = { 0 }; 00253 Poco::Net::SocketAddress saddr; 00254 std::string myname; 00255 ev::io evReceive; 00256 ev::periodic evCheckConnection; 00257 ev::periodic evStatistic; 00258 ev::periodic evUpdate; 00259 00260 UpdateStrategy upStrategy = { useUpdateEventLoop }; 00261 00262 // счётчики для подсчёта статистики 00263 size_t recvCount = { 0 }; 00264 size_t upCount = { 0 }; 00265 00266 // текущая статистик 00267 size_t statRecvPerSec = { 0 }; 00268 size_t statUpPerSec = { 0 }; 00270 std::shared_ptr< ThreadCreator<UNetReceiver> > upThread; // update thread 00271 00272 // делаем loop общим.. одним на всех! 00273 static CommonEventLoop loop; 00274 00275 double checkConnectionTime = { 10.0 }; // sec 00276 std::mutex checkConnMutex; 00277 00278 PassiveTimer ptRecvTimeout; 00279 PassiveTimer ptPrepare; 00280 timeout_t recvTimeout = { 5000 }; // msec 00281 timeout_t prepareTime = { 2000 }; 00282 timeout_t lostTimeout = { 200 }; 00283 PassiveTimer ptLostTimeout; 00284 size_t lostPackets = { 0 }; 00286 uniset::ObjectId sidRespond = { uniset::DefaultObjectId }; 00287 IOController::IOStateList::iterator itRespond; 00288 bool respondInvert = { false }; 00289 uniset::ObjectId sidLostPackets; 00290 IOController::IOStateList::iterator itLostPackets; 00291 00292 std::atomic_bool activated = { false }; 00293 00294 PacketQueue qpack; 00295 UniSetUDP::UDPMessage pack; 00296 UniSetUDP::UDPPacket r_buf; 00297 std::mutex packMutex; 00298 size_t pnum = { 0 }; 00303 size_t maxDifferens = { 20 }; 00304 00305 PacketQueue qtmp; 00306 bool waitClean = { false }; 00307 size_t rnum = { 0 }; 00309 size_t maxProcessingCount = { 100 }; 00311 std::atomic_bool lockUpdate = { false }; 00313 EventSlot slEvent; 00314 Trigger trTimeout; 00315 std::mutex tmMutex; 00316 00317 struct CacheItem 00318 { 00319 long id = { uniset::DefaultObjectId }; 00320 IOController::IOStateList::iterator ioit; 00321 00322 CacheItem(): 00323 id(uniset::DefaultObjectId) {} 00324 }; 00325 00326 typedef std::vector<CacheItem> CacheVec; 00327 struct CacheInfo 00328 { 00329 CacheInfo(): 00330 cache_init_ok(false) {} 00331 00332 bool cache_init_ok = { false }; 00333 CacheVec cache; 00334 }; 00335 00336 // ключом является UDPMessage::getDataID() 00337 typedef std::unordered_map<long, CacheInfo> CacheMap; 00338 CacheMap d_icache_map; 00339 CacheMap a_icache_map; 00341 bool d_cache_init_ok = { false }; 00342 bool a_cache_init_ok = { false }; 00343 00344 void initDCache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept; 00345 void initACache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept; 00346 }; 00347 // -------------------------------------------------------------------------- 00348 } // end of namespace uniset 00349 // ----------------------------------------------------------------------------- 00350 #endif // UNetReceiver_H_ 00351 // -----------------------------------------------------------------------------
1.7.6.1