177 lines
5.0 KiB
C++
177 lines
5.0 KiB
C++
|
// Example of parsing JSON to document by parts.
|
||
|
|
||
|
// Using C++11 threads
|
||
|
// Temporarily disable for clang (older version) due to incompatibility with libstdc++
|
||
|
#if (__cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1700)) && !defined(__clang__)
|
||
|
|
||
|
#include "rapidjson/document.h"
|
||
|
#include "rapidjson/error/en.h"
|
||
|
#include "rapidjson/writer.h"
|
||
|
#include "rapidjson/ostreamwrapper.h"
|
||
|
#include <condition_variable>
|
||
|
#include <iostream>
|
||
|
#include <mutex>
|
||
|
#include <thread>
|
||
|
|
||
|
using namespace rapidjson;
|
||
|
|
||
|
template<unsigned parseFlags = kParseDefaultFlags>
|
||
|
class AsyncDocumentParser {
|
||
|
public:
|
||
|
AsyncDocumentParser(Document& d)
|
||
|
: stream_(*this)
|
||
|
, d_(d)
|
||
|
, parseThread_()
|
||
|
, mutex_()
|
||
|
, notEmpty_()
|
||
|
, finish_()
|
||
|
, completed_()
|
||
|
{
|
||
|
// Create and execute thread after all member variables are initialized.
|
||
|
parseThread_ = std::thread(&AsyncDocumentParser::Parse, this);
|
||
|
}
|
||
|
|
||
|
~AsyncDocumentParser() {
|
||
|
if (!parseThread_.joinable())
|
||
|
return;
|
||
|
|
||
|
{
|
||
|
std::unique_lock<std::mutex> lock(mutex_);
|
||
|
|
||
|
// Wait until the buffer is read up (or parsing is completed)
|
||
|
while (!stream_.Empty() && !completed_)
|
||
|
finish_.wait(lock);
|
||
|
|
||
|
// Automatically append '\0' as the terminator in the stream.
|
||
|
static const char terminator[] = "";
|
||
|
stream_.src_ = terminator;
|
||
|
stream_.end_ = terminator + 1;
|
||
|
notEmpty_.notify_one(); // unblock the AsyncStringStream
|
||
|
}
|
||
|
|
||
|
parseThread_.join();
|
||
|
}
|
||
|
|
||
|
void ParsePart(const char* buffer, size_t length) {
|
||
|
std::unique_lock<std::mutex> lock(mutex_);
|
||
|
|
||
|
// Wait until the buffer is read up (or parsing is completed)
|
||
|
while (!stream_.Empty() && !completed_)
|
||
|
finish_.wait(lock);
|
||
|
|
||
|
// Stop further parsing if the parsing process is completed.
|
||
|
if (completed_)
|
||
|
return;
|
||
|
|
||
|
// Set the buffer to stream and unblock the AsyncStringStream
|
||
|
stream_.src_ = buffer;
|
||
|
stream_.end_ = buffer + length;
|
||
|
notEmpty_.notify_one();
|
||
|
}
|
||
|
|
||
|
private:
|
||
|
void Parse() {
|
||
|
d_.ParseStream<parseFlags>(stream_);
|
||
|
|
||
|
// The stream may not be fully read, notify finish anyway to unblock ParsePart()
|
||
|
std::unique_lock<std::mutex> lock(mutex_);
|
||
|
completed_ = true; // Parsing process is completed
|
||
|
finish_.notify_one(); // Unblock ParsePart() or destructor if they are waiting.
|
||
|
}
|
||
|
|
||
|
struct AsyncStringStream {
|
||
|
typedef char Ch;
|
||
|
|
||
|
AsyncStringStream(AsyncDocumentParser& parser) : parser_(parser), src_(), end_(), count_() {}
|
||
|
|
||
|
char Peek() const {
|
||
|
std::unique_lock<std::mutex> lock(parser_.mutex_);
|
||
|
|
||
|
// If nothing in stream, block to wait.
|
||
|
while (Empty())
|
||
|
parser_.notEmpty_.wait(lock);
|
||
|
|
||
|
return *src_;
|
||
|
}
|
||
|
|
||
|
char Take() {
|
||
|
std::unique_lock<std::mutex> lock(parser_.mutex_);
|
||
|
|
||
|
// If nothing in stream, block to wait.
|
||
|
while (Empty())
|
||
|
parser_.notEmpty_.wait(lock);
|
||
|
|
||
|
count_++;
|
||
|
char c = *src_++;
|
||
|
|
||
|
// If all stream is read up, notify that the stream is finish.
|
||
|
if (Empty())
|
||
|
parser_.finish_.notify_one();
|
||
|
|
||
|
return c;
|
||
|
}
|
||
|
|
||
|
size_t Tell() const { return count_; }
|
||
|
|
||
|
// Not implemented
|
||
|
char* PutBegin() { return 0; }
|
||
|
void Put(char) {}
|
||
|
void Flush() {}
|
||
|
size_t PutEnd(char*) { return 0; }
|
||
|
|
||
|
bool Empty() const { return src_ == end_; }
|
||
|
|
||
|
AsyncDocumentParser& parser_;
|
||
|
const char* src_; //!< Current read position.
|
||
|
const char* end_; //!< End of buffer
|
||
|
size_t count_; //!< Number of characters taken so far.
|
||
|
};
|
||
|
|
||
|
AsyncStringStream stream_;
|
||
|
Document& d_;
|
||
|
std::thread parseThread_;
|
||
|
std::mutex mutex_;
|
||
|
std::condition_variable notEmpty_;
|
||
|
std::condition_variable finish_;
|
||
|
bool completed_;
|
||
|
};
|
||
|
|
||
|
int main() {
|
||
|
Document d;
|
||
|
|
||
|
{
|
||
|
AsyncDocumentParser<> parser(d);
|
||
|
|
||
|
const char json1[] = " { \"hello\" : \"world\", \"t\" : tr";
|
||
|
//const char json1[] = " { \"hello\" : \"world\", \"t\" : trX"; // For test parsing error
|
||
|
const char json2[] = "ue, \"f\" : false, \"n\": null, \"i\":123, \"pi\": 3.14";
|
||
|
const char json3[] = "16, \"a\":[1, 2, 3, 4] } ";
|
||
|
|
||
|
parser.ParsePart(json1, sizeof(json1) - 1);
|
||
|
parser.ParsePart(json2, sizeof(json2) - 1);
|
||
|
parser.ParsePart(json3, sizeof(json3) - 1);
|
||
|
}
|
||
|
|
||
|
if (d.HasParseError()) {
|
||
|
std::cout << "Error at offset " << d.GetErrorOffset() << ": " << GetParseError_En(d.GetParseError()) << std::endl;
|
||
|
return EXIT_FAILURE;
|
||
|
}
|
||
|
|
||
|
// Stringify the JSON to cout
|
||
|
OStreamWrapper os(std::cout);
|
||
|
Writer<OStreamWrapper> writer(os);
|
||
|
d.Accept(writer);
|
||
|
std::cout << std::endl;
|
||
|
|
||
|
return EXIT_SUCCESS;
|
||
|
}
|
||
|
|
||
|
#else // Not supporting C++11
|
||
|
|
||
|
#include <iostream>
|
||
|
int main() {
|
||
|
std::cout << "This example requires C++11 compiler" << std::endl;
|
||
|
}
|
||
|
|
||
|
#endif
|