8 #include <QDBusConnection>
9 #include <QDBusServiceWatcher>
18 using namespace swift::misc::shared_state::dbus;
24 connect(&m_watchTimer, &QTimer::timeout,
this, &CDataLinkDBus::checkConnection);
25 m_watchTimer.setInterval(1000);
34 Q_ASSERT_X(!m_hub, Q_FUNC_INFO,
"Already initialized");
35 m_hub = IHub::create(
false, server, QDBusConnection(
"unused"), {},
this);
42 Q_ASSERT_X(!m_hub, Q_FUNC_INFO,
"Already initialized");
43 m_hub = IHub::create(
true,
nullptr, connection, service,
this);
48 void CDataLinkDBus::checkConnection()
51 else { onDisconnected(); }
54 void CDataLinkDBus::onConnected()
56 if (m_duplex) {
return; }
59 std::tie(m_duplex, ready) = m_hub->
getDuplex(m_identifier);
60 connect(m_duplex.get(), &IDuplex::eventPosted,
this, &CDataLinkDBus::handlePeerEvent);
61 connect(m_duplex.get(), &IDuplex::peerSubscriptionsReceived,
this, &CDataLinkDBus::setPeerSubscriptions);
62 connect(m_duplex.get(), &IDuplex::requestReceived,
this, &CDataLinkDBus::handlePeerRequest);
63 doAfter(ready, m_duplex.get(), [
this] {
64 m_duplex->requestPeerSubscriptions();
65 announceLocalSubscriptions();
66 setConnectionStatus(true);
70 void CDataLinkDBus::onDisconnected()
76 void CDataLinkDBus::handleLocalEvent(
const QString &channel,
const CVariant ¶m)
78 handlePeerEvent(channel, param);
80 if (!m_duplex) {
return; }
82 for (
const auto &filter : std::as_const(getChannel(channel).peerSubscriptions))
84 if (filter.matches(param))
86 m_duplex->postEvent(channel, param);
92 void CDataLinkDBus::handlePeerEvent(
const QString &channel,
const CVariant ¶m)
94 for (
const auto &observerWeak : std::as_const(getChannel(channel).passiveObservers))
96 auto observer = observerWeak.lock();
97 if (observer && observer->eventSubscription().matches(param)) { observer->handleEvent(param); }
101 void CDataLinkDBus::announceLocalSubscriptions()
103 for (
const auto &channel : getChannelNames()) { announceLocalSubscriptions(channel); }
106 void CDataLinkDBus::announceLocalSubscriptions(
const QString &channel)
108 CVariantList filters;
109 for (
const auto &observerWeak : std::as_const(getChannel(channel).passiveObservers))
111 auto observer = observerWeak.lock();
112 if (observer) { filters.push_back(observer->eventSubscription()); }
114 m_duplex->setSubscription(channel, filters);
117 void CDataLinkDBus::setPeerSubscriptions(
const QString &channel,
const CVariantList &filters)
119 getChannel(channel).peerSubscriptions = filters;
122 QFuture<CVariant> CDataLinkDBus::handleLocalRequest(
const QString &channel,
const CVariant ¶m)
124 auto mutator = getChannel(channel).activeMutator.lock();
125 if (mutator) {
return mutator->handleRequest(param); }
127 if (!m_duplex) {
return {}; }
129 return m_duplex->submitRequest(channel, param);
132 void CDataLinkDBus::handlePeerRequest(
const QString &channel,
const CVariant ¶m, quint32 token)
134 auto mutator = getChannel(channel).activeMutator.lock();
137 doAfter(mutator->handleRequest(param),
this, [
this, token](
auto future) {
138 if (m_duplex) { m_duplex->reply(future.result(), token); }
145 connect(mutator, &CPassiveMutator::eventPosted,
this,
146 [
this, channel = getChannelName(mutator)](
const CVariant ¶m) { handleLocalEvent(channel, param); });
153 auto &channel = getChannel(mutator);
154 Q_ASSERT_X(!channel.activeMutator, Q_FUNC_INFO,
"Tried to publish two active mutators on one channel");
155 channel.activeMutator = mutator->
weakRef();
157 if (m_duplex) { m_duplex->advertise(getChannelName(mutator)); }
158 connect(mutator, &QObject::destroyed,
this, [
this, channel = getChannelName(mutator)] {
159 if (m_duplex) { m_duplex->withdraw(channel); }
165 getChannel(observer).passiveObservers.push_back(observer->
weakRef());
167 auto announce = [
this, channel = getChannelName(observer)] {
168 if (m_duplex) { announceLocalSubscriptions(channel); }
170 connect(observer, &CPassiveObserver::eventSubscriptionChanged,
this, announce);
171 connect(observer, &QObject::destroyed,
this, announce);
179 connect(observer, &CActiveObserver::requestPosted,
this,
181 reply.
chainResult(handleLocalRequest(channel, param));
185 QStringList CDataLinkDBus::getChannelNames()
const
187 QMutexLocker lock(&m_channelsMutex);
188 return m_channels.keys();
191 CDataLinkDBus::Channel &CDataLinkDBus::getChannel(
const QString &name)
193 QMutexLocker lock(&m_channelsMutex);
194 return m_channels[name];
197 CDataLinkDBus::Channel &CDataLinkDBus::getChannel(
const QObject *
object)
199 return getChannel(getChannelName(
object));
Value object encapsulating information identifying a component of a modular distributed swift process...
A promise-based interface to QFuture, similar to std::promise for std::future.
void chainResult(QFuture< U > future)
When the given future is ready, use its result to set the result of this promise.
Wrapper around QVariant which provides transparent access to CValueObject methods of the contained ob...
Extends CPassiveMutator with the ability to respond to requests.
QWeakPointer< const CActiveMutator > weakRef() const
Get a QWeakPointer pointing to this object.
Extends CPassiveObserver with the ability to send requests and receive replies.
void initializeRemote(const QDBusConnection &connection, const QString &service)
Initialize on client side.
void initializeLocal(CDBusServer *server=nullptr)
Initialize on server side.
void overrideIdentifier(const CIdentifier &)
Override identifier for testing purposes.
virtual ~CDataLinkDBus()
Destructor.
CDataLinkDBus(QObject *parent=nullptr)
Constructor.
Endpoint which can emit events to subscribers.
Endpoint which can subscribe to events emitted by CPassiveMutator.
QWeakPointer< const CPassiveObserver > weakRef() const
Get a QWeakPointer pointing to this object.
void setConnectionStatus(bool connected)
Set the connection status visible through the watcher.
virtual std::pair< QSharedPointer< IDuplex >, QFuture< bool > > getDuplex(const CIdentifier &)=0
Get a duplex object for the calling process.
virtual bool isConnected() const =0
Is connected?
Utilities for sharing state between multiple objects.
void doAfter(QFuture< T > future, QObject *context, F &&func)
Connect a slot or function to be invoked in the given context when a QFuture is finished.