UniSet  2.6.0
UNetReceiver.h
00001 /*
00002  * Copyright (c) 2015 Pavel Vainerman.
00003  *
00004  * This program is free software: you can redistribute it and/or modify
00005  * it under the terms of the GNU Lesser General Public License as
00006  * published by the Free Software Foundation, version 2.1.
00007  *
00008  * This program is distributed in the hope that it will be useful, but
00009  * WITHOUT ANY WARRANTY; without even the implied warranty of
00010  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
00011  * Lesser General Lesser Public License for more details.
00012  *
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this program. If not, see <http://www.gnu.org/licenses/>.
00015  */
00016 // -----------------------------------------------------------------------------
00017 #ifndef UNetReceiver_H_
00018 #define UNetReceiver_H_
00019 // -----------------------------------------------------------------------------
00020 #include <ostream>
00021 #include <memory>
00022 #include <string>
00023 #include <queue>
00024 #include <unordered_map>
00025 #include <sigc++/sigc++.h>
00026 #include <ev++.h>
00027 #include "UniSetObject.h"
00028 #include "Trigger.h"
00029 #include "Mutex.h"
00030 #include "SMInterface.h"
00031 #include "SharedMemory.h"
00032 #include "UDPPacket.h"
00033 #include "CommonEventLoop.h"
00034 #include "UDPCore.h"
00035 // --------------------------------------------------------------------------
00036 namespace uniset
00037 {
00038 // -----------------------------------------------------------------------------
00039 /*  Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
00040  * ===============
00041  * Собственно реализация сделана так:
00042  * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
00043  * что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета
00044  * (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд",
00045  * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
00046  * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше..
00047  * Всё это реализовано в функции UNetReceiver::real_update()
00048  *
00049  * КЭШ
00050  * ===
00051  * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
00052  * Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
00053  * Порядковый номер данных в пакете является индексом в кэше.
00054  * Для защиты от изменения последовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
00055  * ID который пришёл в пакете - элемент кэша обновляется.
00056  * Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
00057  *
00058  * КЭШ (ДОПОЛНЕНИЕ)
00059  * ===
00060  * Т.к. в общем случае, данные могут быть разбиты не несколько (много) пакетов, то для каждого из них выделен свой кэш и создан отдельный
00061  * map, ключом в котором является идентификатор данных (см. UDPMessage::getDataID()).
00062  * Кэш в map добавляется когда приходит пакет с новым UDPMessage::getDataID() и в дальнейшим используется для этого пакета.
00063  * В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и расчитан на статичность пакетов,
00064  * т.е. на то что UNetSender не будет с течением времени менять структуру отправляемых пакетов.
00065  *
00066  * Обработка сбоя или переполнения счётчика пакетов(перехода через максимум)
00067  * =========================================================================
00068  * Для защиты от сбоя счётика сделана следующая логика:
00069  * Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается,
00070  * что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью.
00071  * При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет,
00072  * в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка.
00073  * Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься
00074  * и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные
00075  * затирают старые, если их не успели вынуть и обработать.
00076  * \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
00077  * =========================================================================
00078  * ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем..
00079  *
00080  * Создание соединения (открытие сокета)
00081  * ======================================
00082  * Попытка создать сокет производиться сразу в конструкторе, если это не получается,
00083  * то создаётся таймер (evCheckConnection), который периодически (checkConnectionTime) пытается вновь
00084  * открыть сокет.. и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы
00085  * (в момент создания объекта UNetReceiver) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
00086  * ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть).
00087  * Если такая логика не требуется, то можно задать в конструкторе
00088  * последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет
00089  * выкинуто исключение при неудачной попытке создания соединения.
00090  *
00091  * Стратегия обновления данных в SM
00092  * ==================================
00093  * При помощи функции setUpdateStrategy() можно выбрать стратегию обновления данных в SM.
00094  * Поддерживается два варианта:
00095  * 'thread' - отдельный поток обновления
00096  * 'evloop' - использование общего с приёмом event loop (libev)
00097 */
00098 // -----------------------------------------------------------------------------
00099 class UNetReceiver:
00100     protected EvWatcher,
00101     public std::enable_shared_from_this<UNetReceiver>
00102 {
00103     public:
00104         UNetReceiver( const std::string& host, int port, const std::shared_ptr<SMInterface>& smi, bool nocheckConnection = false );
00105         virtual ~UNetReceiver();
00106 
00107         void start();
00108         void stop();
00109 
00110         inline const std::string getName() const
00111         {
00112             return myname;
00113         }
00114 
00115         // блокировать сохранение данных в SM
00116         void setLockUpdate( bool st ) noexcept;
00117         inline bool isLockUpdate() const noexcept
00118         {
00119             return lockUpdate;
00120         }
00121 
00122         void resetTimeout() noexcept;
00123 
00124         inline bool isRecvOK() const noexcept
00125         {
00126             return !ptRecvTimeout.checkTime();
00127         }
00128         inline size_t getLostPacketsNum() const noexcept
00129         {
00130             return lostPackets;
00131         }
00132 
00133         void setReceiveTimeout( timeout_t msec ) noexcept;
00134         void setReceivePause( timeout_t msec ) noexcept;
00135         void setUpdatePause( timeout_t msec ) noexcept;
00136         void setLostTimeout( timeout_t msec ) noexcept;
00137         void setPrepareTime( timeout_t msec ) noexcept;
00138         void setCheckConnectionPause( timeout_t msec ) noexcept;
00139         void setMaxDifferens( unsigned long set ) noexcept;
00140 
00141         void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept;
00142         void setLostPacketsID( uniset::ObjectId id ) noexcept;
00143 
00144         void setMaxProcessingCount( int set ) noexcept;
00145 
00146         void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись
00147 
00148         inline std::string getAddress() const noexcept
00149         {
00150             return addr;
00151         }
00152         inline int getPort() const noexcept
00153         {
00154             return port;
00155         }
00156 
00158         enum Event
00159         {
00160             evOK,        
00161             evTimeout    
00162         };
00163 
00164         typedef sigc::slot<void, const std::shared_ptr<UNetReceiver>&, Event> EventSlot;
00165         void connectEvent( EventSlot sl ) noexcept;
00166 
00167         // --------------------------------------------------------------------
00169         enum UpdateStrategy
00170         {
00171             useUpdateUnknown,
00172             useUpdateThread,    
00173             useUpdateEventLoop  
00174         };
00175 
00176         static UpdateStrategy strToUpdateStrategy( const std::string& s ) noexcept;
00177         static std::string to_string( UpdateStrategy s) noexcept;
00178 
00180         void setUpdateStrategy( UpdateStrategy set );
00181 
00182         // специальная обёртка, захватывающая или нет mutex в зависимости от стратегии
00183         // (т.к. при evloop mutex захватытвать не нужно)
00184         class pack_guard
00185         {
00186             public:
00187                 pack_guard( std::mutex& m, UpdateStrategy s );
00188                 ~pack_guard();
00189 
00190             protected:
00191                 std::mutex& m;
00192                 UpdateStrategy s;
00193         };
00194 
00195         // --------------------------------------------------------------------
00196 
00197         inline std::shared_ptr<DebugStream> getLog()
00198         {
00199             return unetlog;
00200         }
00201 
00202         virtual const std::string getShortInfo() const noexcept;
00203 
00204     protected:
00205 
00206         const std::shared_ptr<SMInterface> shm;
00207         std::shared_ptr<DebugStream> unetlog;
00208 
00209         bool receive() noexcept;
00210         void step() noexcept;
00211         void update() noexcept;
00212         void updateThread() noexcept;
00213         void callback( ev::io& watcher, int revents ) noexcept;
00214         void readEvent( ev::io& watcher ) noexcept;
00215         void updateEvent( ev::periodic& watcher, int revents ) noexcept;
00216         void checkConnectionEvent( ev::periodic& watcher, int revents ) noexcept;
00217         void statisticsEvent( ev::periodic& watcher, int revents ) noexcept;
00218         virtual void evprepare( const ev::loop_ref& eloop ) noexcept override;
00219         virtual void evfinish(const ev::loop_ref& eloop ) noexcept override;
00220         virtual std::string wname() const noexcept override
00221         {
00222             return myname;
00223         }
00224 
00225         void initIterators() noexcept;
00226         bool createConnection( bool throwEx = false );
00227         void checkConnection();
00228 
00229     public:
00230 
00231         // функция определения приоритетного сообщения для обработки
00232         struct PacketCompare:
00233             public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
00234         {
00235             inline bool operator()(const UniSetUDP::UDPMessage& lhs,
00236                                    const UniSetUDP::UDPMessage& rhs) const
00237             {
00238                 return lhs.num > rhs.num;
00239             }
00240         };
00241 
00242         typedef std::priority_queue<UniSetUDP::UDPMessage, std::vector<UniSetUDP::UDPMessage>, PacketCompare> PacketQueue;
00243 
00244     private:
00245         UNetReceiver();
00246 
00247         timeout_t recvpause = { 10 };      
00248         timeout_t updatepause = { 100 };    
00250         std::shared_ptr<UDPReceiveU> udp;
00251         std::string addr;
00252         int port = { 0 };
00253         Poco::Net::SocketAddress saddr;
00254         std::string myname;
00255         ev::io evReceive;
00256         ev::periodic evCheckConnection;
00257         ev::periodic evStatistic;
00258         ev::periodic evUpdate;
00259 
00260         UpdateStrategy upStrategy = { useUpdateEventLoop };
00261 
00262         // счётчики для подсчёта статистики
00263         size_t recvCount = { 0 };
00264         size_t upCount = { 0 };
00265 
00266         // текущая статистик
00267         size_t statRecvPerSec = { 0 }; 
00268         size_t statUpPerSec = { 0 };    
00270         std::shared_ptr< ThreadCreator<UNetReceiver> > upThread;    // update thread
00271 
00272         // делаем loop общим.. одним на всех!
00273         static CommonEventLoop loop;
00274 
00275         double checkConnectionTime = { 10.0 }; // sec
00276         std::mutex checkConnMutex;
00277 
00278         PassiveTimer ptRecvTimeout;
00279         PassiveTimer ptPrepare;
00280         timeout_t recvTimeout = { 5000 }; // msec
00281         timeout_t prepareTime = { 2000 };
00282         timeout_t lostTimeout = { 200 };
00283         PassiveTimer ptLostTimeout;
00284         size_t lostPackets = { 0 }; 
00286         uniset::ObjectId sidRespond = { uniset::DefaultObjectId };
00287         IOController::IOStateList::iterator itRespond;
00288         bool respondInvert = { false };
00289         uniset::ObjectId sidLostPackets;
00290         IOController::IOStateList::iterator itLostPackets;
00291 
00292         std::atomic_bool activated = { false };
00293 
00294         PacketQueue qpack;    
00295         UniSetUDP::UDPMessage pack;        
00296         UniSetUDP::UDPPacket r_buf;
00297         std::mutex packMutex; 
00298         size_t pnum = { 0 };    
00303         size_t maxDifferens = { 20 };
00304 
00305         PacketQueue qtmp;    
00306         bool waitClean = { false };    
00307         size_t rnum = { 0 };    
00309         size_t maxProcessingCount = { 100 }; 
00311         std::atomic_bool lockUpdate = { false }; 
00313         EventSlot slEvent;
00314         Trigger trTimeout;
00315         std::mutex tmMutex;
00316 
00317         struct CacheItem
00318         {
00319             long id = { uniset::DefaultObjectId };
00320             IOController::IOStateList::iterator ioit;
00321 
00322             CacheItem():
00323                 id(uniset::DefaultObjectId) {}
00324         };
00325 
00326         typedef std::vector<CacheItem> CacheVec;
00327         struct CacheInfo
00328         {
00329             CacheInfo():
00330                 cache_init_ok(false) {}
00331 
00332             bool cache_init_ok = { false };
00333             CacheVec cache;
00334         };
00335 
00336         // ключом является UDPMessage::getDataID()
00337         typedef std::unordered_map<long, CacheInfo> CacheMap;
00338         CacheMap d_icache_map;     
00339         CacheMap a_icache_map;     
00341         bool d_cache_init_ok = { false };
00342         bool a_cache_init_ok = { false };
00343 
00344         void initDCache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept;
00345         void initACache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept;
00346 };
00347 // --------------------------------------------------------------------------
00348 } // end of namespace uniset
00349 // -----------------------------------------------------------------------------
00350 #endif // UNetReceiver_H_
00351 // -----------------------------------------------------------------------------