Newer
Older
casic_unitree_dog / include / unitree / common / dds / dds_entity.hpp
#ifndef __UT_DDS_ENTITY_HPP__
#define __UT_DDS_ENTITY_HPP__

#include <dds/dds.hpp>
#include <unitree/common/log/log.hpp>
#include <unitree/common/block_queue.hpp>
#include <unitree/common/thread/thread.hpp>
#include <unitree/common/time/time_tool.hpp>
#include <unitree/common/time/sleep.hpp>
#include <unitree/common/dds/dds_exception.hpp>
#include <unitree/common/dds/dds_callback.hpp>
#include <unitree/common/dds/dds_qos.hpp>
#include <unitree/common/dds/dds_traits.hpp>

#define __UT_DDS_NULL__ ::dds::core::null

/*
 * dds wait sub/pub matched default time slice.
 * default 10000 us
 */
#define __UT_DDS_WAIT_MATCHED_TIME_SLICE 10000
#define __UT_DDS_WAIT_MATCHED_TIME_MAX   1000000

using namespace org::eclipse::cyclonedds;

namespace unitree
{
namespace common
{
class DdsLogger
{
public:
    DdsLogger();
    virtual ~DdsLogger();

protected:
    Logger* mLogger;
};

/*
 * @brief: DdsParticipant
 */
class DdsParticipant : public DdsLogger
{
public:
    using NATIVE_TYPE = ::dds::domain::DomainParticipant;

    explicit DdsParticipant(uint32_t domainId, const DdsParticipantQos& qos, const std::string& config = "");
    ~DdsParticipant();

    const NATIVE_TYPE& GetNative() const;

private:
    NATIVE_TYPE mNative;
};

using DdsParticipantPtr = std::shared_ptr<DdsParticipant>;


/*
 * @brief: DdsPublisher
 */
class DdsPublisher : public DdsLogger
{
public:
    using NATIVE_TYPE = ::dds::pub::Publisher;

    explicit DdsPublisher(const DdsParticipantPtr& participant, const DdsPublisherQos& qos);
    ~DdsPublisher();

    const NATIVE_TYPE& GetNative() const;

private:
    NATIVE_TYPE mNative;
};

using DdsPublisherPtr = std::shared_ptr<DdsPublisher>;


/*
 * @brief: DdsSubscriber
 */
class DdsSubscriber : public DdsLogger
{
public:
    using NATIVE_TYPE = ::dds::sub::Subscriber;

    explicit DdsSubscriber(const DdsParticipantPtr& participant, const DdsSubscriberQos& qos);
    ~DdsSubscriber();

    const NATIVE_TYPE& GetNative() const;

private:
    NATIVE_TYPE mNative;
};

using DdsSubscriberPtr = std::shared_ptr<DdsSubscriber>;


/*
 * @brief: DdsTopic
 */
template<typename MSG>
class DdsTopic : public DdsLogger
{
public:
    using NATIVE_TYPE = ::dds::topic::Topic<MSG>;

    explicit DdsTopic(const DdsParticipantPtr& participant, const std::string& name, const DdsTopicQos& qos) :
        mNative(__UT_DDS_NULL__)
    {
        UT_DDS_EXCEPTION_TRY

        auto topicQos = participant->GetNative().default_topic_qos();
        qos.CopyToNativeQos(topicQos);

        mNative = NATIVE_TYPE(participant->GetNative(), name, topicQos);

        UT_DDS_EXCEPTION_CATCH(mLogger, true)
    }

    ~DdsTopic()
    {
        mNative = __UT_DDS_NULL__;
    }

    const NATIVE_TYPE& GetNative() const
    {
        return mNative;
    }

private:
    NATIVE_TYPE mNative;
};

template<typename MSG>
using DdsTopicPtr = std::shared_ptr<DdsTopic<MSG>>;


/*
 * @brief: DdsWriter
 */
template<typename MSG>
class DdsWriter : public DdsLogger
{
public:
    using NATIVE_TYPE = ::dds::pub::DataWriter<MSG>;

    explicit DdsWriter(const DdsPublisherPtr publisher, const DdsTopicPtr<MSG>& topic, const DdsWriterQos& qos) :
        mNative(__UT_DDS_NULL__)
    {
        UT_DDS_EXCEPTION_TRY

        auto writerQos = publisher->GetNative().default_datawriter_qos();
        qos.CopyToNativeQos(writerQos);

        mNative = NATIVE_TYPE(publisher->GetNative(), topic->GetNative(), writerQos);

        UT_DDS_EXCEPTION_CATCH(mLogger, true)
    }

    ~DdsWriter()
    {
        mNative = __UT_DDS_NULL__;
    }

    const NATIVE_TYPE& GetNative() const
    {
        return mNative;
    }

    bool Write(const MSG& message, int64_t waitMicrosec)
    {
        if (waitMicrosec > 0)
        {
            WaitReader(waitMicrosec);
        }

        UT_DDS_EXCEPTION_TRY
        {
            mNative.write(message);
            return true;
        }
        UT_DDS_EXCEPTION_CATCH(mLogger, false)

        return false;
    }

private:
    void WaitReader(int64_t waitMicrosec)
    {
        if (waitMicrosec < __UT_DDS_WAIT_MATCHED_TIME_SLICE)
        {
            return;
        }

        int64_t waitTime = (waitMicrosec / 2);
        if (waitTime > __UT_DDS_WAIT_MATCHED_TIME_MAX)
        {
            waitTime = __UT_DDS_WAIT_MATCHED_TIME_MAX;
        }

        while (waitTime > 0 && mNative.publication_matched_status().current_count() == 0)
        {
            MicroSleep(__UT_DDS_WAIT_MATCHED_TIME_SLICE);
            waitTime -=__UT_DDS_WAIT_MATCHED_TIME_SLICE;
        }
    }

private:
    NATIVE_TYPE mNative;
};

template<typename MSG>
using DdsWriterPtr = std::shared_ptr<DdsWriter<MSG>>;


/*
 * @brief: DdsReaderListener
 */
template<typename MSG>
class DdsReaderListener : public ::dds::sub::NoOpDataReaderListener<MSG>, DdsLogger
{
public:
    using NATIVE_TYPE = ::dds::sub::DataReaderListener<MSG>;
    using MSG_PTR = std::shared_ptr<MSG>;

    explicit DdsReaderListener() :
        mHasQueue(false), mQuit(false), mMask(::dds::core::status::StatusMask::none()), mLastDataAvailableTime(0)
    {}

    ~DdsReaderListener()
    {
        if (mHasQueue)
        {
            mQuit = true;
            mDataQueuePtr->Interrupt(false);
            mDataQueueThreadPtr->Wait();
        }
    }

    void SetCallback(const DdsReaderCallback& cb)
    {
        if (cb.HasMessageHandler())
        {
            mMask |= ::dds::core::status::StatusMask::data_available();
        }

        mCallbackPtr.reset(new DdsReaderCallback(cb));
    }

    void SetQueue(int32_t len)
    {
        if (len <= 0)
        {
            return;
        }

        mHasQueue = true;
        mDataQueuePtr.reset(new BlockQueue<MSG_PTR>(len));

        auto queueThreadFunc = [this]() {
            while (true)
            {
                if (mCallbackPtr && mCallbackPtr->HasMessageHandler())
                {
                    break;
                }
                else
                {
                    MicroSleep(__UT_DDS_WAIT_MATCHED_TIME_SLICE);
                }
            }
            while (!mQuit)
            {
                MSG_PTR dataPtr;
                if (mDataQueuePtr->Get(dataPtr))
                {
                    if (dataPtr)
                    {
                        mCallbackPtr->OnDataAvailable(dataPtr.get());
                    }
                }
            }
            return 0;
        };

        mDataQueueThreadPtr = CreateThreadEx("rlsnr", UT_CPU_ID_NONE, queueThreadFunc);
    }

    int64_t GetLastDataAvailableTime() const
    {
        return mLastDataAvailableTime;
    }

    NATIVE_TYPE* GetNative() const
    {
        return (NATIVE_TYPE*)this;
    }

    const ::dds::core::status::StatusMask& GetStatusMask() const
    {
        return mMask;
    }

private:
    void on_data_available(::dds::sub::DataReader<MSG>& reader)
    {
        ::dds::sub::LoanedSamples<MSG> samples;
        samples = reader.take();

        if (samples.length() <= 0)
        {
            return;
        }

        typename ::dds::sub::LoanedSamples<MSG>::const_iterator iter;
        for (iter=samples.begin(); iter<samples.end(); ++iter)
        {
            const MSG& m = iter->data();
            if (iter->info().valid())
            {
                mLastDataAvailableTime = GetCurrentMonotonicTimeNanosecond();

                if (mHasQueue)
                {
                    if (!mDataQueuePtr->Put(MSG_PTR(new MSG(m)), true))
                    {
                        LOG_WARNING(mLogger, "earliest mesage was evicted. type:", DdsGetTypeName(MSG));
                    }
                }
                else
                {
                    mCallbackPtr->OnDataAvailable((const void*)&m);
                }
            }
        }
    }

private:
    bool mHasQueue;
    volatile bool mQuit;

    ::dds::core::status::StatusMask mMask;
    int64_t mLastDataAvailableTime;

    DdsReaderCallbackPtr mCallbackPtr;
    BlockQueuePtr<MSG_PTR> mDataQueuePtr;
    ThreadPtr mDataQueueThreadPtr;
};

template<typename MSG>
using DdsReaderListenerPtr = std::shared_ptr<DdsReaderListener<MSG>>;


/*
 * @brief: DdsReader
 */
template<typename MSG>
class DdsReader : public DdsLogger
{
public:
    using NATIVE_TYPE = ::dds::sub::DataReader<MSG>;

    explicit DdsReader(const DdsSubscriberPtr& subscriber, const DdsTopicPtr<MSG>& topic, const DdsReaderQos& qos) :
        mNative(__UT_DDS_NULL__)
    {
        UT_DDS_EXCEPTION_TRY

        auto readerQos = subscriber->GetNative().default_datareader_qos();
        qos.CopyToNativeQos(readerQos);

        mNative = NATIVE_TYPE(subscriber->GetNative(), topic->GetNative(), readerQos);

        UT_DDS_EXCEPTION_CATCH(mLogger, true)
    }

    ~DdsReader()
    {
        mNative = __UT_DDS_NULL__;
    }

    const NATIVE_TYPE& GetNative() const
    {
        return mNative;
    }

    void SetListener(const DdsReaderCallback& cb, int32_t qlen)
    {
        mListener.SetCallback(cb);
        mListener.SetQueue(qlen);
        mNative.listener(mListener.GetNative(), mListener.GetStatusMask());
    }

    int64_t GetLastDataAvailableTime() const
    {
        return mListener.GetLastDataAvailableTime();
    }

private:
    NATIVE_TYPE mNative;
    DdsReaderListener<MSG> mListener;
};

template<typename MSG>
using DdsReaderPtr = std::shared_ptr<DdsReader<MSG>>;

}
}

#endif//__UT_DDS_ENTITY_HPP__