Free Electron
SharedMemCatalog.h
Go to the documentation of this file.
1 /* Copyright (C) 2003-2021 Free Electron Organization
2  Any use of this software requires a license. If a valid license
3  was not distributed with this file, visit freeelectron.org. */
4 
5 /** @file */
6 
7 #ifndef __shm_SharedMem_h__
8 #define __shm_SharedMem_h__
9 
10 #define FE_SMC_COMM_COUNT 8
11 #define FE_SMC_COMMAND_BYTES 1024
12 
13 namespace fe
14 {
15 namespace ext
16 {
17 
18 /**************************************************************************//**
19  @brief ConnectedCatalog over ZeroMQ
20 
21  @ingroup shm
22 *//***************************************************************************/
23 class FE_DL_EXPORT SharedMemCatalog:
24  public ConnectedCatalog,
25  public Initialize<SharedMemCatalog>
26 {
27  public:
28 
29  SharedMemCatalog(void);
30 virtual ~SharedMemCatalog(void);
31 
32  void initialize(void);
33 
34  protected:
35 
36 virtual Result disconnect(void) override;
37 virtual Result dropConnection(void);
38 
39  using ConnectedCatalog::removeIdentity;
40 virtual Result removeIdentity(I32 a_index) override;
41 
42 virtual Result connectAsServer(String a_address,U16 a_port) override;
43 virtual Result connectAsClient(String a_address,U16 a_port) override;
44 
45 virtual Result catchUp(Identity& a_rIdentity);
46 
47  private:
48 
49 virtual void broadcastSelect(String a_name,String a_property,
50  String a_message,
51  I32 a_includeCount,const String* a_pIncludes,
52  I32 a_excludeCount,const String* a_pExcludes,
53  const U8* a_pRawBytes=NULL,I32 a_byteCount=0) override;
54 
55  struct Connect
56  {
57  public:
58  sem_t m_semStart;
59  sem_t m_semClient;
60  sem_t m_semIdentity;
61  sem_t m_semAck;
62  Identity m_identity;
63  };
64 
65  struct Comm
66  {
67  public:
68  sem_t m_semWrite; //* last reader should post
69  sem_t m_semRead; //* one per intended recipient
70  sem_t m_semCount; //* for m_readerCount and m_serial
71  I32 m_readerCount;
72  I32 m_serial;
73 
74  char m_include[32];
75  char m_exclude[32];
76  char m_command[8];
77  char m_key[64];
78  char m_property[32];
79  char m_type[32];
80  I32 m_byteCount;
81  U8 m_pData[FE_SMC_COMMAND_BYTES];
82  };
83  struct Share
84  {
85  public:
86  Connect m_connect;
87  Comm m_serverComm[FE_SMC_COMM_COUNT];
88  Comm m_clientComm[FE_SMC_COMM_COUNT];
89  };
90  class Channel
91  {
92  public:
93  Channel(void):
94  m_pShare(NULL),
95  m_serverIndex(0),
96  m_clientIndex(0),
97  m_sendSerial(0),
98  m_receiveSerial(-1),
99  m_discarded(FALSE) {}
100  virtual ~Channel(void)
101  {
102  if(m_pShare)
103  munmap(m_pShare,sizeof(Share));
104  }
105  void reset(void)
106  {
107  if(m_pShare)
108  munmap(m_pShare,sizeof(Share));
109  m_pShare=NULL;
110  m_serverIndex=0;
111  m_clientIndex=0;
112  m_sendSerial=0;
113  m_receiveSerial=-1;
114  m_discarded=FALSE;
115  }
116 
117  Share* m_pShare;
118  std::deque<Comm> m_commBacklog;
119  I32 m_serverIndex;
120  I32 m_clientIndex;
121  I32 m_sendSerial;
122  I32 m_receiveSerial;
123  BWORD m_discarded;
124  };
125 
126  //* TODO additional clients need to know what serverIndex to start with
127 
128  class FE_DL_EXPORT ReceiverTask: public Thread::Functor
129  {
130  public:
131  ReceiverTask(void);
132  virtual ~ReceiverTask(void);
133 
134  void reset(void);
135 
136  virtual void operate(void);
137 
138  Result connectAsServer(String a_transport,
139  String a_address,U16 a_port);
140  Result connectAsClient(String a_transport,
141  String a_address,U16 a_port);
142 
143  Result startShare(String a_shareName,Share*& a_rpShare,
144  BWORD a_create);
145 
146  I32 writeBacklogs(void);
147  I32 writeBacklog(Channel& a_rChannel);
148 
149  BWORD receiveUpdate(const String& a_source,
150  SharedMemCatalog::Channel& a_rChannel,
151  SharedMemCatalog::Comm& a_rComm);
152 
153  void decrementReaderCounts(void);
154  BWORD decrementReaderCount(SharedMemCatalog::Comm& a_rComm);
155  void dumpSemaphores(SharedMemCatalog::Channel& a_rChannel);
156 
157  SharedMemCatalog* m_pSharedMemCatalog;
158  String m_endpoint;
159  std::atomic<bool> m_terminate;
160  std::atomic<I32> m_readerDecrementCount;
161  Identity m_identity;
162  Channel m_publicChannel;
163  AutoHashMap<String,Channel> m_privateChannelMap;
164  BWORD m_receivePublic;
165  BWORD m_isServer;
166  };
167 
168  BWORD m_usePublicChannel;
169  Thread* m_pReceiverThread;
170  ReceiverTask m_receiverTask;
171 };
172 
173 } /* namespace ext */
174 } /* namespace fe */
175 
176 #endif /* __shm_SharedMem_h__ */
kernel
Definition: namespace.dox:3
Per-class participation in the Initialized <> mechanism.
Definition: Initialized.h:117
StateCatalog with connected mirroring.
Definition: ConnectedCatalog.h:70
ConnectedCatalog over ZeroMQ.
Definition: SharedMemCatalog.h:23
Automatically reference-counted string container.
Definition: String.h:128
Result
Generalized return value indicating success or failure, and why.
Definition: Result.h:24