blob: 6344edc7d8407bc07df4f04e8991d1e65d120109 [file] [log] [blame]
/*
* Copyright (C) 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "src/trace_processor/rpc/query_result_serializer.h"
#include <deque>
#include <ostream>
#include <random>
#include <string>
#include <vector>
#include "perfetto/ext/base/string_utils.h"
#include "perfetto/trace_processor/basic_types.h"
#include "perfetto/trace_processor/trace_processor.h"
#include "test/gtest_and_gmock.h"
#include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
namespace perfetto {
namespace trace_processor {
// For ASSERT_THAT(ElementsAre(...))
inline bool operator==(const SqlValue& a, const SqlValue& b) {
if (a.type != b.type)
return false;
if (a.type == SqlValue::kString)
return strcmp(a.string_value, b.string_value) == 0;
if (a.type == SqlValue::kBytes) {
if (a.bytes_count != b.bytes_count)
return false;
return memcmp(a.bytes_value, b.bytes_value, a.bytes_count) == 0;
}
return a.long_value == b.long_value;
}
inline std::ostream& operator<<(std::ostream& stream, const SqlValue& v) {
stream << "SqlValue{";
switch (v.type) {
case SqlValue::kString:
return stream << "\"" << v.string_value << "\"}";
case SqlValue::kBytes:
return stream << "Bytes[" << v.bytes_count << "]:"
<< base::ToHex(reinterpret_cast<const char*>(v.bytes_value),
v.bytes_count)
<< "}";
case SqlValue::kLong:
return stream << "Long " << v.long_value << "}";
case SqlValue::kDouble:
return stream << "Double " << v.double_value << "}";
case SqlValue::kNull:
return stream << "NULL}";
}
return stream;
}
namespace {
using ::testing::ElementsAre;
using BatchProto = protos::pbzero::QueryResult::CellsBatch;
using ResultProto = protos::pbzero::QueryResult;
void RunQueryChecked(TraceProcessor* tp, const std::string& query) {
auto iter = tp->ExecuteQuery(query);
iter.Next();
ASSERT_TRUE(iter.Status().ok()) << iter.Status().message();
}
// Implements a minimal deserializer for QueryResultSerializer.
class TestDeserializer {
public:
void SerializeAndDeserialize(QueryResultSerializer*);
void DeserializeBuffer(const uint8_t* start, size_t size);
std::vector<std::string> columns;
std::vector<SqlValue> cells;
std::string error;
bool eof_reached = false;
private:
std::vector<std::unique_ptr<char[]>> copied_buf_;
};
void TestDeserializer::SerializeAndDeserialize(
QueryResultSerializer* serializer) {
std::vector<uint8_t> buf;
error.clear();
for (eof_reached = false; !eof_reached;) {
serializer->Serialize(&buf);
DeserializeBuffer(buf.data(), buf.size());
buf.clear();
}
}
void TestDeserializer::DeserializeBuffer(const uint8_t* start, size_t size) {
ResultProto::Decoder result(start, size);
error += result.error().ToStdString();
for (auto it = result.column_names(); it; ++it)
columns.push_back(it->as_std_string());
for (auto batch_it = result.batch(); batch_it; ++batch_it) {
ASSERT_FALSE(eof_reached);
auto batch_bytes = batch_it->as_bytes();
ResultProto::CellsBatch::Decoder batch(batch_bytes.data, batch_bytes.size);
eof_reached = batch.is_last_batch();
std::deque<int64_t> varints;
std::deque<double> doubles;
std::deque<std::string> blobs;
bool parse_error = false;
for (auto it = batch.varint_cells(&parse_error); it; ++it)
varints.emplace_back(*it);
for (auto it = batch.float64_cells(&parse_error); it; ++it)
doubles.emplace_back(*it);
for (auto it = batch.blob_cells(); it; ++it)
blobs.emplace_back((*it).ToStdString());
std::string merged_strings = batch.string_cells().ToStdString();
std::deque<std::string> strings;
for (size_t pos = 0; pos < merged_strings.size();) {
// Will return npos for the last string, but it's fine
size_t next_sep = merged_strings.find('\0', pos);
strings.emplace_back(merged_strings.substr(pos, next_sep - pos));
pos = next_sep == std::string::npos ? next_sep : next_sep + 1;
}
uint32_t num_cells = 0;
for (auto it = batch.cells(&parse_error); it; ++it, ++num_cells) {
uint8_t cell_type = static_cast<uint8_t>(*it);
switch (cell_type) {
case BatchProto::CELL_INVALID:
break;
case BatchProto::CELL_NULL:
cells.emplace_back(SqlValue());
break;
case BatchProto::CELL_VARINT:
ASSERT_GT(varints.size(), 0u);
cells.emplace_back(SqlValue::Long(varints.front()));
varints.pop_front();
break;
case BatchProto::CELL_FLOAT64:
ASSERT_GT(doubles.size(), 0u);
cells.emplace_back(SqlValue::Double(doubles.front()));
doubles.pop_front();
break;
case BatchProto::CELL_STRING: {
ASSERT_GT(strings.size(), 0u);
const std::string& str = strings.front();
copied_buf_.emplace_back(new char[str.size() + 1]);
char* new_buf = copied_buf_.back().get();
memcpy(new_buf, str.c_str(), str.size() + 1);
cells.emplace_back(SqlValue::String(new_buf));
strings.pop_front();
break;
}
case BatchProto::CELL_BLOB: {
ASSERT_GT(blobs.size(), 0u);
auto bytes = blobs.front();
copied_buf_.emplace_back(new char[bytes.size()]);
memcpy(copied_buf_.back().get(), bytes.data(), bytes.size());
cells.emplace_back(
SqlValue::Bytes(copied_buf_.back().get(), bytes.size()));
blobs.pop_front();
break;
}
default:
FAIL() << "Unknown cell type " << cell_type;
}
EXPECT_FALSE(parse_error);
}
if (columns.empty()) {
EXPECT_EQ(num_cells, 0u);
} else {
EXPECT_EQ(num_cells % columns.size(), 0u);
}
}
}
TEST(QueryResultSerializerTest, ShortBatch) {
auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
auto iter = tp->ExecuteQuery(
"select 1 as i8, 128 as i16, 100000 as i32, 42001001001 as i64, 1e9 as "
"f64, 'a_string' as str, cast('a_blob' as blob) as blb");
QueryResultSerializer ser(std::move(iter));
TestDeserializer deser;
deser.SerializeAndDeserialize(&ser);
EXPECT_THAT(deser.columns,
ElementsAre("i8", "i16", "i32", "i64", "f64", "str", "blb"));
EXPECT_THAT(deser.cells,
ElementsAre(SqlValue::Long(1), SqlValue::Long(128),
SqlValue::Long(100000), SqlValue::Long(42001001001),
SqlValue::Double(1e9), SqlValue::String("a_string"),
SqlValue::Bytes("a_blob", 6)));
}
TEST(QueryResultSerializerTest, LongBatch) {
auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
RunQueryChecked(tp.get(), "create virtual table win using window;");
RunQueryChecked(tp.get(),
"update win set window_start=0, window_dur=8192, quantum=1 "
"where rowid = 0");
auto iter = tp->ExecuteQuery(
"select 'x' as x, ts, dur * 1.0 as dur, quantum_ts from win");
QueryResultSerializer ser(std::move(iter));
TestDeserializer deser;
deser.SerializeAndDeserialize(&ser);
ASSERT_THAT(deser.columns, ElementsAre("x", "ts", "dur", "quantum_ts"));
ASSERT_EQ(deser.cells.size(), 4 * 8192u);
for (uint32_t row = 0; row < 1024; row++) {
uint32_t cell = row * 4;
ASSERT_EQ(deser.cells[cell].type, SqlValue::kString);
ASSERT_STREQ(deser.cells[cell].string_value, "x");
ASSERT_EQ(deser.cells[cell + 1].type, SqlValue::kLong);
ASSERT_EQ(deser.cells[cell + 1].long_value, row);
ASSERT_EQ(deser.cells[cell + 2].type, SqlValue::kDouble);
ASSERT_EQ(deser.cells[cell + 2].double_value, 1.0);
ASSERT_EQ(deser.cells[cell + 3].type, SqlValue::kLong);
ASSERT_EQ(deser.cells[cell + 3].long_value, row);
}
}
TEST(QueryResultSerializerTest, BatchSaturatingBinaryPayload) {
auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
RunQueryChecked(tp.get(), "create virtual table win using window;");
RunQueryChecked(tp.get(),
"update win set window_start=0, window_dur=1024, quantum=1 "
"where rowid = 0");
auto iter = tp->ExecuteQuery(
"select 'x' as x, ts, dur * 1.0 as dur, quantum_ts from win");
QueryResultSerializer ser(std::move(iter));
ser.set_batch_size_for_testing(1024, 32);
TestDeserializer deser;
deser.SerializeAndDeserialize(&ser);
ASSERT_THAT(deser.columns, ElementsAre("x", "ts", "dur", "quantum_ts"));
ASSERT_EQ(deser.cells.size(), 1024 * 4u);
}
TEST(QueryResultSerializerTest, BatchSaturatingNumCells) {
auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
RunQueryChecked(tp.get(), "create virtual table win using window;");
RunQueryChecked(tp.get(),
"update win set window_start=0, window_dur=4, quantum=1 "
"where rowid = 0");
auto iter = tp->ExecuteQuery(
"select 'x' as x, ts, dur * 1.0 as dur, quantum_ts from win");
QueryResultSerializer ser(std::move(iter));
ser.set_batch_size_for_testing(16, 4096);
TestDeserializer deser;
deser.SerializeAndDeserialize(&ser);
ASSERT_THAT(deser.columns, ElementsAre("x", "ts", "dur", "quantum_ts"));
ASSERT_EQ(deser.cells.size(), 16u);
}
TEST(QueryResultSerializerTest, LargeStringAndBlobs) {
auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
RunQueryChecked(tp.get(), "create table tab (colz);");
std::minstd_rand0 rnd_engine(0);
std::vector<SqlValue> expected;
std::string sql_values;
std::deque<std::string> string_buf; // Needs stable pointers
for (size_t n = 0; n < 32; n++) {
std::string very_long_str;
size_t len = (rnd_engine() % 4) * 32 * 1024;
very_long_str.resize(len);
for (size_t i = 0; i < very_long_str.size(); i++)
very_long_str[i] = 'A' + ((n * 11 + i) % 25);
if (n % 4 == 0) {
sql_values += "(NULL),";
expected.emplace_back(SqlValue()); // NULL.
} else if (n % 4 == 1) {
// Blob
sql_values += "(X'" + base::ToHex(very_long_str) + "'),";
string_buf.emplace_back(std::move(very_long_str));
expected.emplace_back(
SqlValue::Bytes(string_buf.back().data(), string_buf.back().size()));
} else {
sql_values += "('" + very_long_str + "'),";
string_buf.emplace_back(std::move(very_long_str));
expected.emplace_back(SqlValue::String(string_buf.back().c_str()));
}
}
sql_values.resize(sql_values.size() - 1); // Remove trailing comma.
RunQueryChecked(tp.get(), "insert into tab (colz) values " + sql_values);
auto iter = tp->ExecuteQuery("select colz from tab");
QueryResultSerializer ser(std::move(iter));
TestDeserializer deser;
deser.SerializeAndDeserialize(&ser);
ASSERT_EQ(deser.cells.size(), expected.size());
for (size_t i = 0; i < expected.size(); i++) {
EXPECT_EQ(deser.cells[i], expected[i]) << "Cell " << i;
}
}
TEST(QueryResultSerializerTest, RandomSizes) {
auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
static constexpr uint32_t kNumCells = 3 * 1000;
RunQueryChecked(tp.get(), "create table tab (a, b, c);");
std::vector<SqlValue> expected;
expected.reserve(kNumCells);
std::deque<std::string> string_buf; // Needs stable pointers
std::minstd_rand0 rnd_engine(0);
std::string insert_values;
for (uint32_t i = 0; i < kNumCells; i++) {
const uint32_t col = i % 3;
if (col == 0)
insert_values += "(";
int type = rnd_engine() % 5;
if (type == 0) {
expected.emplace_back(SqlValue()); // NULL
insert_values += "NULL";
} else if (type == 1) {
expected.emplace_back(SqlValue::Long(static_cast<long>(rnd_engine())));
insert_values += std::to_string(expected.back().long_value);
} else if (type == 2) {
expected.emplace_back(
SqlValue::Double(static_cast<double>(rnd_engine())));
insert_values += std::to_string(expected.back().double_value);
} else if (type == 3 || type == 4) {
size_t len = (rnd_engine() % 5) * 32;
std::string rndstr;
rndstr.resize(len);
for (size_t n = 0; n < len; n++)
rndstr[n] = static_cast<char>(rnd_engine() % 256);
auto rndstr_hex = base::ToHex(rndstr);
if (type == 3) {
insert_values += "\"" + rndstr_hex + "\"";
string_buf.emplace_back(std::move(rndstr_hex));
expected.emplace_back(SqlValue::String(string_buf.back().c_str()));
} else {
insert_values += "X'" + rndstr_hex + "'";
string_buf.emplace_back(std::move(rndstr));
expected.emplace_back(SqlValue::Bytes(string_buf.back().data(),
string_buf.back().size()));
}
}
if (col < 2) {
insert_values += ",";
} else {
insert_values += "),";
if (insert_values.size() > 1024 * 1024 || i == kNumCells - 1) {
insert_values[insert_values.size() - 1] = ';';
auto query = "insert into tab (a,b,c) values " + insert_values;
insert_values = "";
RunQueryChecked(tp.get(), query);
}
}
}
// Serialize and de-serialize with different batch and payload sizes.
for (int rep = 0; rep < 10; rep++) {
auto iter = tp->ExecuteQuery("select * from tab");
QueryResultSerializer ser(std::move(iter));
uint32_t cells_per_batch = 1 << (rnd_engine() % 8 + 2);
uint32_t binary_payload_size = 1 << (rnd_engine() % 8 + 8);
ser.set_batch_size_for_testing(cells_per_batch, binary_payload_size);
TestDeserializer deser;
deser.SerializeAndDeserialize(&ser);
ASSERT_EQ(deser.cells.size(), expected.size());
for (size_t i = 0; i < expected.size(); i++) {
EXPECT_EQ(deser.cells[i], expected[i]) << "Cell " << i;
}
}
}
TEST(QueryResultSerializerTest, ErrorBeforeStartingQuery) {
auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
auto iter = tp->ExecuteQuery("insert into incomplete_input");
QueryResultSerializer ser(std::move(iter));
TestDeserializer deser;
deser.SerializeAndDeserialize(&ser);
EXPECT_EQ(deser.cells.size(), 0u);
EXPECT_EQ(deser.error, "Error: incomplete input (errcode: 1)");
EXPECT_TRUE(deser.eof_reached);
}
TEST(QueryResultSerializerTest, ErrorAfterSomeResults) {
auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
RunQueryChecked(tp.get(), "create table tab (x)");
RunQueryChecked(tp.get(), "insert into tab (x) values (0), (1), ('error')");
auto iter = tp->ExecuteQuery("select str_split('a;b', ';', x) as s from tab");
QueryResultSerializer ser(std::move(iter));
TestDeserializer deser;
deser.SerializeAndDeserialize(&ser);
EXPECT_NE(deser.error, "");
EXPECT_THAT(deser.cells,
ElementsAre(SqlValue::String("a"), SqlValue::String("b")));
EXPECT_TRUE(deser.eof_reached);
}
TEST(QueryResultSerializerTest, NoResultQuery) {
auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
{
auto iter = tp->ExecuteQuery("create table tab (x)");
QueryResultSerializer ser(std::move(iter));
TestDeserializer deser;
deser.SerializeAndDeserialize(&ser);
EXPECT_EQ(deser.error, "");
EXPECT_EQ(deser.cells.size(), 0u);
EXPECT_TRUE(deser.eof_reached);
}
// Check that the table has been created for real.
{
auto iter = tp->ExecuteQuery("select count(*) from tab");
QueryResultSerializer ser(std::move(iter));
TestDeserializer deser;
deser.SerializeAndDeserialize(&ser);
EXPECT_EQ(deser.error, "");
EXPECT_EQ(deser.cells.size(), 1u);
EXPECT_TRUE(deser.eof_reached);
}
}
} // namespace
} // namespace trace_processor
} // namespace perfetto