swift
datalinkdbus.cpp
Go to the documentation of this file.
1 // SPDX-FileCopyrightText: Copyright (C) 2017 swift Project Community / Contributors
2 // SPDX-License-Identifier: GPL-3.0-or-later OR LicenseRef-swift-pilot-client-1
3 
5 
7 
8 #include <QDBusConnection>
9 #include <QDBusServiceWatcher>
10 
11 #include "misc/dbusserver.h"
17 
18 using namespace swift::misc::shared_state::dbus;
19 
21 {
22  CDataLinkDBus::CDataLinkDBus(QObject *parent) : QObject(parent), m_watchTimer(this)
23  {
24  connect(&m_watchTimer, &QTimer::timeout, this, &CDataLinkDBus::checkConnection);
25  m_watchTimer.setInterval(1000);
26  }
27 
29 
30  void CDataLinkDBus::overrideIdentifier(const CIdentifier &id) { m_identifier = id; }
31 
33  {
34  Q_ASSERT_X(!m_hub, Q_FUNC_INFO, "Already initialized");
35  m_hub = IHub::create(false, server, QDBusConnection("unused"), {}, this);
36  m_watchTimer.start();
37  checkConnection();
38  }
39 
40  void CDataLinkDBus::initializeRemote(const QDBusConnection &connection, const QString &service)
41  {
42  Q_ASSERT_X(!m_hub, Q_FUNC_INFO, "Already initialized");
43  m_hub = IHub::create(true, nullptr, connection, service, this);
44  m_watchTimer.start();
45  checkConnection();
46  }
47 
48  void CDataLinkDBus::checkConnection()
49  {
50  if (m_hub->isConnected()) { onConnected(); }
51  else { onDisconnected(); }
52  }
53 
54  void CDataLinkDBus::onConnected()
55  {
56  if (m_duplex) { return; }
57 
58  QFuture<void> ready;
59  std::tie(m_duplex, ready) = m_hub->getDuplex(m_identifier);
60  connect(m_duplex.get(), &IDuplex::eventPosted, this, &CDataLinkDBus::handlePeerEvent);
61  connect(m_duplex.get(), &IDuplex::peerSubscriptionsReceived, this, &CDataLinkDBus::setPeerSubscriptions);
62  connect(m_duplex.get(), &IDuplex::requestReceived, this, &CDataLinkDBus::handlePeerRequest);
63  doAfter(ready, m_duplex.get(), [this] {
64  m_duplex->requestPeerSubscriptions();
65  announceLocalSubscriptions();
66  setConnectionStatus(true);
67  });
68  }
69 
70  void CDataLinkDBus::onDisconnected()
71  {
72  m_duplex.reset();
73  setConnectionStatus(false);
74  }
75 
76  void CDataLinkDBus::handleLocalEvent(const QString &channel, const CVariant &param)
77  {
78  handlePeerEvent(channel, param);
79 
80  if (!m_duplex) { return; }
81 
82  for (const auto &filter : std::as_const(getChannel(channel).peerSubscriptions))
83  {
84  if (filter.matches(param))
85  {
86  m_duplex->postEvent(channel, param);
87  return;
88  }
89  }
90  }
91 
92  void CDataLinkDBus::handlePeerEvent(const QString &channel, const CVariant &param)
93  {
94  for (const auto &observerWeak : std::as_const(getChannel(channel).passiveObservers))
95  {
96  auto observer = observerWeak.lock();
97  if (observer && observer->eventSubscription().matches(param)) { observer->handleEvent(param); }
98  }
99  }
100 
101  void CDataLinkDBus::announceLocalSubscriptions()
102  {
103  for (const auto &channel : getChannelNames()) { announceLocalSubscriptions(channel); }
104  }
105 
106  void CDataLinkDBus::announceLocalSubscriptions(const QString &channel)
107  {
108  CVariantList filters;
109  for (const auto &observerWeak : std::as_const(getChannel(channel).passiveObservers))
110  {
111  auto observer = observerWeak.lock();
112  if (observer) { filters.push_back(observer->eventSubscription()); }
113  }
114  m_duplex->setSubscription(channel, filters);
115  }
116 
117  void CDataLinkDBus::setPeerSubscriptions(const QString &channel, const CVariantList &filters)
118  {
119  getChannel(channel).peerSubscriptions = filters;
120  }
121 
122  QFuture<CVariant> CDataLinkDBus::handleLocalRequest(const QString &channel, const CVariant &param)
123  {
124  auto mutator = getChannel(channel).activeMutator.lock();
125  if (mutator) { return mutator->handleRequest(param); }
126 
127  if (!m_duplex) { return {}; }
128 
129  return m_duplex->submitRequest(channel, param);
130  }
131 
132  void CDataLinkDBus::handlePeerRequest(const QString &channel, const CVariant &param, quint32 token)
133  {
134  auto mutator = getChannel(channel).activeMutator.lock();
135  if (mutator)
136  {
137  doAfter(mutator->handleRequest(param), this, [this, token](auto future) {
138  if (m_duplex) { m_duplex->reply(future.result(), token); }
139  });
140  }
141  }
142 
143  void CDataLinkDBus::publish(const CPassiveMutator *mutator)
144  {
145  connect(mutator, &CPassiveMutator::eventPosted, this,
146  [this, channel = getChannelName(mutator)](const CVariant &param) { handleLocalEvent(channel, param); });
147  }
148 
149  void CDataLinkDBus::publish(const CActiveMutator *mutator)
150  {
151  publish(static_cast<const CPassiveMutator *>(mutator));
152 
153  auto &channel = getChannel(mutator);
154  Q_ASSERT_X(!channel.activeMutator, Q_FUNC_INFO, "Tried to publish two active mutators on one channel");
155  channel.activeMutator = mutator->weakRef();
156 
157  if (m_duplex) { m_duplex->advertise(getChannelName(mutator)); }
158  connect(mutator, &QObject::destroyed, this, [this, channel = getChannelName(mutator)] {
159  if (m_duplex) { m_duplex->withdraw(channel); }
160  });
161  }
162 
163  void CDataLinkDBus::subscribe(const CPassiveObserver *observer)
164  {
165  getChannel(observer).passiveObservers.push_back(observer->weakRef());
166 
167  auto announce = [this, channel = getChannelName(observer)] {
168  if (m_duplex) { announceLocalSubscriptions(channel); }
169  };
170  connect(observer, &CPassiveObserver::eventSubscriptionChanged, this, announce);
171  connect(observer, &QObject::destroyed, this, announce);
172  announce();
173  }
174 
175  void CDataLinkDBus::subscribe(const CActiveObserver *observer)
176  {
177  subscribe(static_cast<const CPassiveObserver *>(observer));
178 
179  connect(observer, &CActiveObserver::requestPosted, this,
180  [this, channel = getChannelName(observer)](const CVariant &param, CPromise<CVariant> reply) {
181  reply.chainResult(handleLocalRequest(channel, param));
182  });
183  }
184 
185  QStringList CDataLinkDBus::getChannelNames() const
186  {
187  QMutexLocker lock(&m_channelsMutex);
188  return m_channels.keys();
189  }
190 
191  CDataLinkDBus::Channel &CDataLinkDBus::getChannel(const QString &name)
192  {
193  QMutexLocker lock(&m_channelsMutex);
194  return m_channels[name];
195  }
196 
197  CDataLinkDBus::Channel &CDataLinkDBus::getChannel(const QObject *object)
198  {
199  return getChannel(getChannelName(object));
200  }
201 } // namespace swift::misc::shared_state
Custom DBusServer.
Definition: dbusserver.h:34
Value object encapsulating information identifying a component of a modular distributed swift process...
Definition: identifier.h:29
A promise-based interface to QFuture, similar to std::promise for std::future.
Definition: promise.h:78
void chainResult(QFuture< U > future)
When the given future is ready, use its result to set the result of this promise.
Definition: promise.h:103
Wrapper around QVariant which provides transparent access to CValueObject methods of the contained ob...
Definition: variant.h:66
Extends CPassiveMutator with the ability to respond to requests.
Definition: activemutator.h:25
QWeakPointer< const CActiveMutator > weakRef() const
Get a QWeakPointer pointing to this object.
Definition: activemutator.h:48
Extends CPassiveObserver with the ability to send requests and receive replies.
Endpoint which can emit events to subscribers.
Endpoint which can subscribe to events emitted by CPassiveMutator.
QWeakPointer< const CPassiveObserver > weakRef() const
Get a QWeakPointer pointing to this object.
virtual std::pair< QSharedPointer< IDuplex >, QFuture< bool > > getDuplex(const CIdentifier &)=0
Get a duplex object for the calling process.
virtual bool isConnected() const =0
Is connected?
Utilities for sharing state between multiple objects.
Definition: application.h:48
void doAfter(QFuture< T > future, QObject *context, F &&func)
Connect a slot or function to be invoked in the given context when a QFuture is finished.
Definition: promise.h:59