blob: 79a5c8f43848c2d4bde5b9dbeac89fd32b1eaba4 [file] [log] [blame]
/*
* Copyright 2015 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cobalt/storage/storage_manager.h"
#include <string>
#include <vector>
#include "base/bind.h"
#include "base/debug/trace_event.h"
#include "base/stringprintf.h"
#include "cobalt/storage/savegame_thread.h"
#include "cobalt/storage/upgrade/upgrade_reader.h"
#include "cobalt/storage/virtual_file.h"
#include "cobalt/storage/virtual_file_system.h"
#include "sql/statement.h"
#include "third_party/sqlite/sqlite3.h"
namespace cobalt {
namespace storage {
namespace {
// Flush() delays for a while to avoid spamming writes to disk.
// We often get a bunch of Flush() calls in a row.
const int kDatabaseFlushDelayMs = 100;
const char kDefaultSaveFile[] = "cobalt_save.bin";
void SqlDisableJournal(sql::Connection* connection) {
// Disable journaling for our in-memory database.
sql::Statement disable_journal(
connection->GetUniqueStatement("PRAGMA journal_mode=OFF"));
bool ok = disable_journal.Step();
DCHECK(ok);
}
int SqlQueryUserVersion(sql::Connection* connection) {
sql::Statement get_db_version(
connection->GetUniqueStatement("PRAGMA user_version"));
bool ok = get_db_version.Step();
DCHECK(ok);
return get_db_version.ColumnInt(0);
}
bool SqlQueryTableExists(sql::Connection* connection, const char* table_name) {
sql::Statement get_exists(connection->GetUniqueStatement(
"SELECT name FROM sqlite_master WHERE name = ? AND type = 'table'"));
get_exists.BindString(0, table_name);
return get_exists.Step();
}
int SqlQuerySchemaVersion(sql::Connection* connection, const char* table_name) {
sql::Statement get_version(connection->GetUniqueStatement(
"SELECT version FROM SchemaTable WHERE name = ?"));
get_version.BindString(0, table_name);
bool row_found = get_version.Step();
if (row_found) {
return get_version.ColumnInt(0);
} else {
return -1;
}
}
void SqlUpdateSchemaVersion(sql::Connection* connection, const char* table_name,
int version) {
sql::Statement update_version(connection->GetUniqueStatement(
"INSERT INTO SchemaTable (name, version)"
"VALUES (?, ?)"));
update_version.BindString(0, table_name);
update_version.BindInt(1, version);
bool ok = update_version.Run();
DCHECK(ok);
}
void SqlCreateSchemaTable(sql::Connection* connection) {
// Create the schema table.
sql::Statement create_table(connection->GetUniqueStatement(
"CREATE TABLE IF NOT EXISTS SchemaTable ("
"name TEXT, "
"version INTEGER, "
"UNIQUE(name, version) ON CONFLICT REPLACE)"));
bool ok = create_table.Run();
DCHECK(ok);
}
void SqlUpdateDatabaseUserVersion(sql::Connection* connection) {
// Update the DB version which will be read in next time.
// NOTE: Pragma statements cannot be bound, so we must construct the string
// in full.
std::string set_db_version_str = base::StringPrintf(
"PRAGMA user_version = %d", StorageManager::kDatabaseUserVersion);
sql::Statement set_db_version(
connection->GetUniqueStatement(set_db_version_str.c_str()));
bool ok = set_db_version.Run();
DCHECK(ok);
}
const std::string& GetFirstValidDatabaseFile(
const std::vector<std::string>& filenames) {
// Caller must ensure at least one file exists.
DCHECK_GT(filenames.size(), size_t(0));
for (size_t i = 0; i < filenames.size(); ++i) {
sql::Connection connection;
bool is_opened = connection.Open(FilePath(filenames[i]));
if (!is_opened) {
continue;
}
int err = connection.ExecuteAndReturnErrorCode("pragma schema_version;");
if (err != SQLITE_OK) {
continue;
}
// File can be opened as a database.
return filenames[i];
}
// Caller must handle case where a valid database file cannot be found.
DLOG(WARNING) << "Cannot find valid database file in save data";
return filenames[0];
}
} // namespace
StorageManager::StorageManager(scoped_ptr<UpgradeHandler> upgrade_handler,
const Options& options)
: upgrade_handler_(upgrade_handler.Pass()),
options_(options),
sql_thread_(new base::Thread("StorageManager SQL")),
ALLOW_THIS_IN_INITIALIZER_LIST(sql_context_(new SqlContext(this))),
connection_(new sql::Connection()),
loaded_database_version_(0),
initialized_(false),
flush_processing_(false),
flush_requested_(false),
no_flushes_pending_(true /* manual reset */,
true /* initially signalled */) {
DCHECK(upgrade_handler_);
TRACE_EVENT0("cobalt::storage", __FUNCTION__);
savegame_thread_.reset(new SavegameThread(options_.savegame_options));
// Start the savegame load immediately.
sql_thread_->Start();
sql_message_loop_ = sql_thread_->message_loop_proxy();
}
StorageManager::~StorageManager() {
TRACE_EVENT0("cobalt::storage", __FUNCTION__);
DCHECK(!sql_message_loop_->BelongsToCurrentThread());
// Wait for all I/O operations to complete.
FinishIO();
// Destroy various objects on the proper thread.
sql_message_loop_->PostTask(FROM_HERE, base::Bind(&StorageManager::OnDestroy,
base::Unretained(this)));
// Force all tasks to finish. Then we can safely let the rest of our
// member variables be destroyed.
sql_thread_.reset();
}
void StorageManager::GetSqlContext(const SqlCallback& callback) {
TRACE_EVENT0("cobalt::storage", __FUNCTION__);
sql_message_loop_->PostTask(FROM_HERE,
base::Bind(callback, sql_context_.get()));
}
void StorageManager::Flush() {
TRACE_EVENT0("cobalt::storage", __FUNCTION__);
// Make sure this runs on the correct thread.
if (MessageLoop::current()->message_loop_proxy() != sql_message_loop_) {
sql_message_loop_->PostTask(
FROM_HERE, base::Bind(&StorageManager::Flush, base::Unretained(this)));
return;
}
flush_timer_->Start(FROM_HERE,
base::TimeDelta::FromMilliseconds(kDatabaseFlushDelayMs),
this, &StorageManager::OnFlushTimerFired);
}
void StorageManager::FlushNow(const base::Closure& callback) {
TRACE_EVENT0("cobalt::storage", __FUNCTION__);
// Make sure this runs on the correct thread.
if (MessageLoop::current()->message_loop_proxy() != sql_message_loop_) {
sql_message_loop_->PostTask(
FROM_HERE, base::Bind(&StorageManager::FlushNow, base::Unretained(this),
callback));
return;
}
QueueFlush(callback);
}
bool StorageManager::GetSchemaVersion(const char* table_name,
int* schema_version) {
TRACE_EVENT0("cobalt::storage", __FUNCTION__);
DCHECK(sql_message_loop_->BelongsToCurrentThread());
DCHECK(schema_version);
if (!SqlQueryTableExists(sql_connection(), table_name)) {
return false;
}
int found_version = SqlQuerySchemaVersion(sql_connection(), table_name);
if (found_version != -1) {
*schema_version = found_version;
} else if (loaded_database_version_ != StorageManager::kDatabaseUserVersion) {
// The schema table did not exist before this session, which is different
// from the schema table being lost.
*schema_version = StorageManager::kSchemaTableIsNew;
} else {
*schema_version = StorageManager::kSchemaVersionLost;
}
return true;
}
void StorageManager::UpdateSchemaVersion(const char* table_name, int version) {
TRACE_EVENT0("cobalt::storage", __FUNCTION__);
DCHECK(sql_message_loop_->BelongsToCurrentThread());
DCHECK_GT(version, 0) << "Schema version numbers must be positive.";
SqlUpdateSchemaVersion(sql_connection(), table_name, version);
}
sql::Connection* StorageManager::sql_connection() {
TRACE_EVENT0("cobalt::storage", __FUNCTION__);
FinishInit();
return connection_.get();
}
void StorageManager::FinishInit() {
TRACE_EVENT0("cobalt::storage", __FUNCTION__);
DCHECK(sql_message_loop_->BelongsToCurrentThread());
if (initialized_) {
return;
}
vfs_.reset(new VirtualFileSystem());
sql_vfs_.reset(new SqlVfs("cobalt_vfs", vfs_.get()));
flush_timer_.reset(new base::OneShotTimer<StorageManager>());
// Savegame has finished loading. Now initialize the database connection.
// Check if this is upgrade data, if so, handle it, otherwise:
// Check if the savegame data contains a VFS header.
// If so, proceed to deserialize it.
// If not, load the file into the VFS directly.
scoped_ptr<Savegame::ByteVector> loaded_raw_bytes =
savegame_thread_->GetLoadedRawBytes();
DCHECK(loaded_raw_bytes);
Savegame::ByteVector& raw_bytes = *loaded_raw_bytes;
VirtualFileSystem::SerializedHeader header = {};
if (raw_bytes.size() > 0) {
const char* buffer = reinterpret_cast<char*>(&raw_bytes[0]);
int buffer_size = static_cast<int>(raw_bytes.size());
// Is this upgrade data?
if (upgrade::UpgradeReader::IsUpgradeData(buffer, buffer_size)) {
upgrade_handler_->OnUpgrade(this, buffer, buffer_size);
} else {
if (raw_bytes.size() >= sizeof(VirtualFileSystem::SerializedHeader)) {
memcpy(&header, &raw_bytes[0],
sizeof(VirtualFileSystem::SerializedHeader));
}
if (!vfs_->Deserialize(&raw_bytes[0], buffer_size)) {
VirtualFile* vf = vfs_->Open(kDefaultSaveFile);
vf->Write(&raw_bytes[0], buffer_size, 0 /* offset */);
}
}
}
std::vector<std::string> filenames = vfs_->ListFiles();
if (filenames.size() == 0) {
filenames.push_back(kDefaultSaveFile);
}
// Legacy Steel save data may contain multiple files (e.g. db-journal as well
// as db), so use the first one that looks like a valid database file.
const std::string& save_name = GetFirstValidDatabaseFile(filenames);
bool ok = connection_->Open(FilePath(save_name));
DCHECK(ok);
// Open() is lazy. Run a quick check to see if the database is valid.
int err = connection_->ExecuteAndReturnErrorCode("pragma schema_version;");
if (err != SQLITE_OK) {
// Database seems to be invalid.
DLOG(WARNING) << "Database " << save_name << " appears to be corrupt.";
// Try to start again. Delete the file in the VFS and make a
// new connection.
vfs_->Delete(save_name);
vfs_->Open(save_name);
connection_.reset(new sql::Connection());
ok = connection_->Open(FilePath(save_name));
DCHECK(ok);
err = connection_->ExecuteAndReturnErrorCode("pragma schema_version;");
DCHECK_EQ(SQLITE_OK, err);
}
// Configure our SQLite database now that it's open.
SqlDisableJournal(connection_.get());
loaded_database_version_ = SqlQueryUserVersion(connection_.get());
SqlCreateSchemaTable(connection_.get());
SqlUpdateDatabaseUserVersion(connection_.get());
initialized_ = true;
}
void StorageManager::OnFlushTimerFired() {
TRACE_EVENT0("cobalt::storage", __FUNCTION__);
DCHECK(sql_message_loop_->BelongsToCurrentThread());
QueueFlush(base::Closure());
}
void StorageManager::OnFlushIOCompletedSQLCallback() {
TRACE_EVENT0("cobalt::storage", __FUNCTION__);
// Make sure this runs on the SQL message loop.
if (MessageLoop::current()->message_loop_proxy() != sql_message_loop_) {
sql_message_loop_->PostTask(
FROM_HERE, base::Bind(&StorageManager::OnFlushIOCompletedSQLCallback,
base::Unretained(this)));
return;
}
flush_processing_ = false;
// Fire all the callbacks waiting on the current flush to complete.
for (size_t i = 0; i < flush_processing_callbacks_.size(); ++i) {
DCHECK(!flush_processing_callbacks_[i].is_null());
flush_processing_callbacks_[i].Run();
}
flush_processing_callbacks_.clear();
if (flush_requested_) {
// If another flush has been requested while we were processing the one that
// just completed, start that next flush now.
flush_processing_callbacks_.swap(flush_requested_callbacks_);
flush_requested_ = false;
FlushInternal();
DCHECK(flush_processing_);
} else {
no_flushes_pending_.Signal();
}
}
void StorageManager::QueueFlush(const base::Closure& callback) {
TRACE_EVENT0("cobalt::storage", __FUNCTION__);
DCHECK(sql_message_loop_->BelongsToCurrentThread());
if (!flush_processing_) {
// If no flush is currently in progress, flush immediately.
if (!callback.is_null()) {
flush_processing_callbacks_.push_back(callback);
}
FlushInternal();
DCHECK(flush_processing_);
} else {
// Otherwise, indicate that we would like to re-flush as soon as the
// current one completes.
flush_requested_ = true;
if (!callback.is_null()) {
flush_requested_callbacks_.push_back(callback);
}
}
}
void StorageManager::FlushInternal() {
TRACE_EVENT0("cobalt::storage", __FUNCTION__);
DCHECK(sql_message_loop_->BelongsToCurrentThread());
FinishInit();
flush_processing_ = true;
no_flushes_pending_.Reset();
// Serialize the database into a buffer. Then send the bytes
// to OnFlushIO for a blocking write to the savegame.
scoped_ptr<Savegame::ByteVector> raw_bytes_ptr;
int size = vfs_->Serialize(NULL, true /*dry_run*/);
raw_bytes_ptr.reset(new Savegame::ByteVector(static_cast<size_t>(size)));
if (size > 0) {
Savegame::ByteVector& raw_bytes = *raw_bytes_ptr;
vfs_->Serialize(&raw_bytes[0], false /*dry_run*/);
}
// Send the savegame bytes off to the SavegameThread object to be
// asynchronously written to the savegame file.
savegame_thread_->Flush(
raw_bytes_ptr.Pass(),
base::Bind(&StorageManager::OnFlushIOCompletedSQLCallback,
base::Unretained(this)));
}
void StorageManager::FinishIO() {
TRACE_EVENT0("cobalt::storage", __FUNCTION__);
DCHECK(!sql_message_loop_->BelongsToCurrentThread());
// The SQL thread may be communicating with the savegame I/O thread still,
// flushing all pending updates. This process can require back and forth
// communication. This method exists to wait for that communication to
// finish and for all pending flushes to complete.
// Start by finishing all commands currently in the sql message loop queue.
// This method is called by the destructor, so the only new tasks posted
// after this one will be generated internally. We need to do this because
// it is possible that there are no flushes pending at this instant, but there
// are tasks queued on |sql_message_loop_| that will begin a flush, and so
// we make sure that these are executed first.
base::WaitableEvent current_queue_finished_event_(true, false);
sql_message_loop_->PostTask(
FROM_HERE,
base::Bind(&base::WaitableEvent::Signal,
base::Unretained(&current_queue_finished_event_)));
current_queue_finished_event_.Wait();
// Now wait for all pending flushes to wrap themselves up. This may involve
// the savegame I/O thread and the SQL thread posting tasks to each other.
no_flushes_pending_.Wait();
}
void StorageManager::OnDestroy() {
TRACE_EVENT0("cobalt::storage", __FUNCTION__);
DCHECK(sql_message_loop_->BelongsToCurrentThread());
// Stop the savegame thread and have it wrap up any pending I/O operations.
savegame_thread_.reset();
// Ensure these objects are destroyed on the proper thread.
flush_timer_.reset(NULL);
sql_vfs_.reset(NULL);
vfs_.reset(NULL);
}
} // namespace storage
} // namespace cobalt