UniSet  2.6.0
UNetSender.h
00001 /*
00002  * Copyright (c) 2015 Pavel Vainerman.
00003  *
00004  * This program is free software: you can redistribute it and/or modify
00005  * it under the terms of the GNU Lesser General Public License as
00006  * published by the Free Software Foundation, version 2.1.
00007  *
00008  * This program is distributed in the hope that it will be useful, but
00009  * WITHOUT ANY WARRANTY; without even the implied warranty of
00010  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
00011  * Lesser General Lesser Public License for more details.
00012  *
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this program. If not, see <http://www.gnu.org/licenses/>.
00015  */
00016 // -----------------------------------------------------------------------------
00017 #ifndef UNetSender_H_
00018 #define UNetSender_H_
00019 // -----------------------------------------------------------------------------
00020 #include <ostream>
00021 #include <string>
00022 #include <vector>
00023 #include <unordered_map>
00024 #include "UniSetObject.h"
00025 #include "Trigger.h"
00026 #include "Mutex.h"
00027 #include "SMInterface.h"
00028 #include "SharedMemory.h"
00029 #include "ThreadCreator.h"
00030 #include "UDPCore.h"
00031 #include "UDPPacket.h"
00032 // --------------------------------------------------------------------------
00033 namespace uniset
00034 {
00035 // -----------------------------------------------------------------------------
00036 /*
00037  *    Распределение датчиков по пакетам
00038  *    =========================================================================
00039  *    Все пересылаемые данные разбиваются на группы по частоте посылки("sendfactor").
00040  *    Частота посылки кратна sendpause, задаётся для каждого датчика, при помощи свойства prefix_sendfactor.
00041  *    Внутри каждой группы пакеты набираются по мере "заполнения".
00042  *
00043  *    Добавление датчика в пакет и создание нового пакета при переполнении происходит в функции initItem().
00044  *    Причем так как дискретные и аналоговые датчики обрабатываются отдельно (но пересылаются в одном пакете),
00045  *    то датчики, которые первые переполнятся приводят к тому, что создаётся новый пакет и они добавляются в него,
00046  *    в свою очередь остальные продолжают "добивать" предыдущий пакет.
00047  *    В initItem() каждому UItem в dlist кроме pack_ind присваивается еще и номер пакета pack_num, который гарантировано соответствует
00048  *    существующему пакету, поэтому в дальнейшем при использовании pack_num в качестве ключа в mypacks мы не проверяем пакет на существование.
00049  *
00050  *    ОПТИМИЗАЦИЯ N1: Для оптимизации обработки посылаемых пакетов (на стороне UNetSender) сделана следующая логика:
00051  *                  Номер очередного посылаемого пакета меняется (увеличивается) только, если изменились данные с момента
00052                     последней посылки. Для этого по данным каждый раз производится расчёт UNetUDP::makeCRC() и сравнивается с последним.
00053                     На стороне UNetReceiver пакаеты с повторными номерами (т.е. уже обработанные) - откидываются.
00054  *
00055  *
00056  * Создание соединения
00057  * ======================================
00058  * Попытка создать соединение производиться сразу в конструкторе, если это не получается,
00059  * то в потоке "посылки", с заданным периодом (checkConnectionTime) идёт попытка создать соединение..
00060  * и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы
00061  * (в момент создания объекта UNetSender) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
00062  * ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть).
00063  * Если такая логика не требуется, то можно задать в конструкторе флаг nocheckconnection=true,
00064  * тогда при создании объекта UNetSender, в конструкторе будет
00065  * выкинуто исключение при неудачной попытке создания соединения.
00066  * \warning setCheckConnectionPause(msec) должно быть кратно sendpause!
00067  */
00068 class UNetSender
00069 {
00070     public:
00071         UNetSender( const std::string& host, const int port, const std::shared_ptr<SMInterface>& smi, bool nocheckConnection = false,
00072                     const std::string& s_field = "", const std::string& s_fvalue = "", const std::string& prefix = "unet",
00073                     size_t maxDCount = UniSetUDP::MaxDCount, size_t maxACount = UniSetUDP::MaxACount );
00074 
00075         virtual ~UNetSender();
00076 
00077         typedef size_t sendfactor_t;
00078 
00079         struct UItem
00080         {
00081             UItem():
00082                 iotype(UniversalIO::UnknownIOType),
00083                 id(uniset::DefaultObjectId),
00084                 pack_num(0),
00085                 pack_ind(0),
00086                 pack_sendfactor(0) {}
00087 
00088             UniversalIO::IOType iotype;
00089             uniset::ObjectId id;
00090             IOController::IOStateList::iterator ioit;
00091             size_t pack_num;
00092             size_t pack_ind;
00093             sendfactor_t pack_sendfactor = { 0 };
00094             friend std::ostream& operator<<( std::ostream& os, UItem& p );
00095         };
00096 
00097         typedef std::unordered_map<uniset::ObjectId, UItem> UItemMap;
00098 
00099         size_t getDataPackCount() const;
00100 
00101         void start();
00102         void stop();
00103 
00104         void send() noexcept;
00105 
00106         struct PackMessage
00107         {
00108             PackMessage( UniSetUDP::UDPMessage&& m ) noexcept: msg(std::move(m)) {}
00109             PackMessage( const UniSetUDP::UDPMessage& m ) = delete;
00110 
00111             PackMessage() noexcept {}
00112 
00113             UniSetUDP::UDPMessage msg;
00114             uniset::uniset_rwmutex mut;
00115         };
00116 
00117         void real_send( PackMessage& mypack ) noexcept;
00118 
00120         void updateFromSM();
00121 
00123         void updateSensor( uniset::ObjectId id, long value );
00124 
00126         void updateItem( UItem& it, long value );
00127 
00128         inline void setSendPause( int msec )
00129         {
00130             sendpause = msec;
00131         }
00132         inline void setPackSendPause( int msec )
00133         {
00134             packsendpause = msec;
00135         }
00136 
00137         void setCheckConnectionPause( int msec );
00138 
00140         void askSensors( UniversalIO::UIOCommand cmd );
00141 
00143         void initIterators();
00144 
00145         inline std::shared_ptr<DebugStream> getLog()
00146         {
00147             return unetlog;
00148         }
00149 
00150         virtual const std::string getShortInfo() const;
00151 
00152         inline std::string getAddress() const
00153         {
00154             return addr;
00155         }
00156         inline int getPort() const
00157         {
00158             return port;
00159         }
00160 
00161         inline size_t getADataSize() const
00162         {
00163             return maxAData;
00164         }
00165         inline size_t getDDataSize() const
00166         {
00167             return maxDData;
00168         }
00169 
00170     protected:
00171 
00172         std::string s_field = { "" };
00173         std::string s_fvalue = { "" };
00174         std::string prefix = { "" };
00175 
00176         const std::shared_ptr<SMInterface> shm;
00177         std::shared_ptr<DebugStream> unetlog;
00178 
00179         bool initItem( UniXML::iterator& it );
00180         bool readItem( const std::shared_ptr<UniXML>& xml, UniXML::iterator& it, xmlNode* sec );
00181 
00182         void readConfiguration();
00183 
00184         bool createConnection( bool throwEx );
00185 
00186     private:
00187         UNetSender();
00188 
00189         std::shared_ptr<UDPSocketU> udp = { nullptr };
00190         std::string addr;
00191         int port = { 0 };
00192         std::string s_host = { "" };
00193         Poco::Net::SocketAddress saddr;
00194 
00195         std::string myname = { "" };
00196         timeout_t sendpause = { 150 };
00197         timeout_t packsendpause = { 5 };
00198         timeout_t writeTimeout = { 1000 }; // msec
00199         std::atomic_bool activated = { false };
00200         PassiveTimer ptCheckConnection;
00201 
00202         typedef std::unordered_map<sendfactor_t, std::vector<PackMessage>> Packs;
00203 
00204         // mypacks заполняется в начале и дальше с ним происходит только чтение
00205         // поэтому mutex-ом его не защищаем
00206         Packs mypacks;
00207         std::unordered_map<sendfactor_t, size_t> packs_anum;
00208         std::unordered_map<sendfactor_t, size_t> packs_dnum;
00209         UItemMap items;
00210         size_t packetnum = { 1 }; 
00211         uint16_t lastcrc = { 0 };
00212         UniSetUDP::UDPPacket s_msg;
00213 
00214         size_t maxAData = { UniSetUDP::MaxACount };
00215         size_t maxDData = { UniSetUDP::MaxDCount };
00216 
00217         std::shared_ptr< ThreadCreator<UNetSender> > s_thr;    // send thread
00218 
00219         size_t ncycle = { 0 }; 
00221 };
00222 // --------------------------------------------------------------------------
00223 } // end of namespace uniset
00224 // -----------------------------------------------------------------------------
00225 #endif // UNetSender_H_
00226 // -----------------------------------------------------------------------------