Free Electron
ZeroCatalog.h
Go to the documentation of this file.
1 /* Copyright (C) 2003-2021 Free Electron Organization
2  Any use of this software requires a license. If a valid license
3  was not distributed with this file, visit freeelectron.org. */
4 
5 /** @file */
6 
7 #ifndef __zeromq_ZeroCatalog_h__
8 #define __zeromq_ZeroCatalog_h__
9 
10 namespace fe
11 {
12 namespace ext
13 {
14 
15 /**************************************************************************//**
16  @brief ConnectedCatalog over ZeroMQ
17 
18  @ingroup zeromq
19 *//***************************************************************************/
20 class FE_DL_EXPORT ZeroCatalog:
21  public ConnectedCatalog,
22  public Initialize<ZeroCatalog>
23 {
24  public:
25 
26  ZeroCatalog(void);
27 virtual ~ZeroCatalog(void);
28 
29  void initialize(void);
30 
31  protected:
32 
33 virtual Result disconnect(void) override;
34 
35 virtual Result connectAsServer(String a_address,U16 a_port) override;
36 virtual Result connectAsClient(String a_address,U16 a_port) override;
37 
38  private:
39 
40 virtual void broadcastSelect(String a_name,String a_property,
41  String a_message,
42  I32 a_includeCount,const String* a_pIncludes,
43  I32 a_excludeCount,const String* a_pExcludes,
44  const U8* a_pRawBytes=NULL,I32 a_byteCount=0) override;
45 
46  void sendString(String a_textD);
47  void sendBytes(const U8* a_pByteBlock,I32 a_byteCountD);
48 
49  class FE_DL_EXPORT ReceiverTask: public Thread::Functor
50  {
51  public:
52  ReceiverTask(void);
53  virtual ~ReceiverTask(void);
54  virtual void operate(void);
55 
56  Result connectAsServer(String a_transport,
57  String a_address,U16 a_port);
58  Result connectAsClient(String a_transport,
59  String a_address,U16 a_port);
60  Result disconnect(void);
61 
62  I32 readBuffer(void** a_ppBuffer,zmq_msg_t* a_pMsg);
63  I32 readString(String& a_rString);
64  I32 readBytes(Array<U8>& a_rByteArray);
65  I32 readIdentity(Identity& a_identity);
66 
67  ZeroCatalog* m_pZeroCatalog;
68  String m_endpoint;
69  void* m_pContext;
70  void* m_pSocket;
71  std::atomic<bool> m_terminate;
72  };
73 
74  //* NOTE one recipient for N messages (using count)
75  class FE_DL_EXPORT Messages: public ObjectSafeShared<Messages>
76  {
77  public:
78  Messages(void):
79  m_available(FALSE) {}
80  Array<Identity> m_recipientArray;
81  Array<I32> m_subCountArray;
82  Array<zmq_msg_t> m_messageArray;
83  std::atomic<bool> m_available;
84  };
85 
86  Thread* m_pReceiverThread;
87  ReceiverTask m_receiverTask;
88  Messages m_messages;
89 };
90 
91 } /* namespace ext */
92 } /* namespace fe */
93 
94 #endif /* __zeromq_ZeroCatalog_h__ */
ConnectedCatalog over ZeroMQ.
Definition: ZeroCatalog.h:20
kernel
Definition: namespace.dox:3
Per-class participation in the Initialized <> mechanism.
Definition: Initialized.h:117
StateCatalog with connected mirroring.
Definition: ConnectedCatalog.h:70
Automatically reference-counted string container.
Definition: String.h:128
Result
Generalized return value indicating success or failure, and why.
Definition: Result.h:24
Object level locking for thread safety.
Definition: SafeShared.h:220