| /* |
| * Copyright 2015 Google Inc. All Rights Reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "cobalt/storage/storage_manager.h" |
| |
| #include <string> |
| #include <vector> |
| |
| #include "base/bind.h" |
| #include "base/debug/trace_event.h" |
| #include "base/stringprintf.h" |
| #include "cobalt/storage/savegame_thread.h" |
| #include "cobalt/storage/virtual_file.h" |
| #include "cobalt/storage/virtual_file_system.h" |
| #include "sql/statement.h" |
| #include "third_party/sqlite/sqlite3.h" |
| |
| namespace cobalt { |
| namespace storage { |
| |
| namespace { |
| |
| // Flush() delays for a while to avoid spamming writes to disk. |
| // We often get a bunch of Flush() calls in a row. |
| const int kDatabaseFlushDelayMs = 100; |
| |
| const char kDefaultSaveFile[] = "cobalt_save.bin"; |
| |
| void SqlDisableJournal(sql::Connection* connection) { |
| // Disable journaling for our in-memory database. |
| sql::Statement disable_journal( |
| connection->GetUniqueStatement("PRAGMA journal_mode=OFF")); |
| bool ok = disable_journal.Step(); |
| DCHECK(ok); |
| } |
| |
| int SqlQueryUserVersion(sql::Connection* connection) { |
| sql::Statement get_db_version( |
| connection->GetUniqueStatement("PRAGMA user_version")); |
| bool ok = get_db_version.Step(); |
| DCHECK(ok); |
| return get_db_version.ColumnInt(0); |
| } |
| |
| bool SqlQueryTableExists(sql::Connection* connection, const char* table_name) { |
| sql::Statement get_exists(connection->GetUniqueStatement( |
| "SELECT name FROM sqlite_master WHERE name = ? AND type = 'table'")); |
| get_exists.BindString(0, table_name); |
| return get_exists.Step(); |
| } |
| |
| int SqlQuerySchemaVersion(sql::Connection* connection, const char* table_name) { |
| sql::Statement get_version(connection->GetUniqueStatement( |
| "SELECT version FROM SchemaTable WHERE name = ?")); |
| get_version.BindString(0, table_name); |
| bool row_found = get_version.Step(); |
| if (row_found) { |
| return get_version.ColumnInt(0); |
| } else { |
| return -1; |
| } |
| } |
| |
| void SqlUpdateSchemaVersion(sql::Connection* connection, const char* table_name, |
| int version) { |
| sql::Statement update_version(connection->GetUniqueStatement( |
| "INSERT INTO SchemaTable (name, version)" |
| "VALUES (?, ?)")); |
| update_version.BindString(0, table_name); |
| update_version.BindInt(1, version); |
| bool ok = update_version.Run(); |
| DCHECK(ok); |
| } |
| |
| void SqlCreateSchemaTable(sql::Connection* connection) { |
| // Create the schema table. |
| sql::Statement create_table(connection->GetUniqueStatement( |
| "CREATE TABLE IF NOT EXISTS SchemaTable (" |
| "name TEXT, " |
| "version INTEGER, " |
| "UNIQUE(name, version) ON CONFLICT REPLACE)")); |
| bool ok = create_table.Run(); |
| DCHECK(ok); |
| } |
| |
| void SqlUpdateDatabaseUserVersion(sql::Connection* connection) { |
| // Update the DB version which will be read in next time. |
| // NOTE: Pragma statements cannot be bound, so we must construct the string |
| // in full. |
| std::string set_db_version_str = base::StringPrintf( |
| "PRAGMA user_version = %d", StorageManager::kDatabaseUserVersion); |
| sql::Statement set_db_version( |
| connection->GetUniqueStatement(set_db_version_str.c_str())); |
| bool ok = set_db_version.Run(); |
| DCHECK(ok); |
| } |
| |
| const std::string& GetFirstValidDatabaseFile( |
| const std::vector<std::string>& filenames) { |
| // Caller must ensure at least one file exists. |
| DCHECK_GT(filenames.size(), size_t(0)); |
| |
| for (size_t i = 0; i < filenames.size(); ++i) { |
| sql::Connection connection; |
| bool is_opened = connection.Open(FilePath(filenames[i])); |
| if (!is_opened) { |
| continue; |
| } |
| int err = connection.ExecuteAndReturnErrorCode("pragma schema_version;"); |
| if (err != SQLITE_OK) { |
| continue; |
| } |
| // File can be opened as a database. |
| return filenames[i]; |
| } |
| |
| // Caller must handle case where a valid database file cannot be found. |
| DLOG(WARNING) << "Cannot find valid database file in save data"; |
| return filenames[0]; |
| } |
| |
| } // namespace |
| |
| StorageManager::StorageManager(const Options& options) |
| : options_(options), |
| sql_thread_(new base::Thread("StorageManager SQL")), |
| ALLOW_THIS_IN_INITIALIZER_LIST(sql_context_(new SqlContext(this))), |
| connection_(new sql::Connection()), |
| loaded_database_version_(0), |
| initialized_(false), |
| flush_processing_(false), |
| flush_requested_(false), |
| no_flushes_pending_(true /* manual reset */, |
| true /* initially signalled */) { |
| TRACE_EVENT0("cobalt::storage", __FUNCTION__); |
| savegame_thread_.reset(new SavegameThread(options_.savegame_options)); |
| // Start the savegame load immediately. |
| sql_thread_->Start(); |
| sql_message_loop_ = sql_thread_->message_loop_proxy(); |
| } |
| |
| StorageManager::~StorageManager() { |
| TRACE_EVENT0("cobalt::storage", __FUNCTION__); |
| DCHECK(!sql_message_loop_->BelongsToCurrentThread()); |
| |
| // Wait for all I/O operations to complete. |
| FinishIO(); |
| |
| // Destroy various objects on the proper thread. |
| sql_message_loop_->PostTask(FROM_HERE, base::Bind(&StorageManager::OnDestroy, |
| base::Unretained(this))); |
| |
| // Force all tasks to finish. Then we can safely let the rest of our |
| // member variables be destroyed. |
| sql_thread_.reset(); |
| } |
| |
| void StorageManager::GetSqlContext(const SqlCallback& callback) { |
| TRACE_EVENT0("cobalt::storage", __FUNCTION__); |
| sql_message_loop_->PostTask(FROM_HERE, |
| base::Bind(callback, sql_context_.get())); |
| } |
| |
| void StorageManager::Flush() { |
| TRACE_EVENT0("cobalt::storage", __FUNCTION__); |
| // Make sure this runs on the correct thread. |
| if (MessageLoop::current()->message_loop_proxy() != sql_message_loop_) { |
| sql_message_loop_->PostTask( |
| FROM_HERE, base::Bind(&StorageManager::Flush, base::Unretained(this))); |
| return; |
| } |
| |
| flush_timer_->Start(FROM_HERE, |
| base::TimeDelta::FromMilliseconds(kDatabaseFlushDelayMs), |
| this, &StorageManager::OnFlushTimerFired); |
| } |
| |
| void StorageManager::FlushNow(const base::Closure& callback) { |
| TRACE_EVENT0("cobalt::storage", __FUNCTION__); |
| // Make sure this runs on the correct thread. |
| if (MessageLoop::current()->message_loop_proxy() != sql_message_loop_) { |
| sql_message_loop_->PostTask( |
| FROM_HERE, base::Bind(&StorageManager::FlushNow, base::Unretained(this), |
| callback)); |
| return; |
| } |
| |
| QueueFlush(callback); |
| } |
| |
| bool StorageManager::GetSchemaVersion(const char* table_name, |
| int* schema_version) { |
| TRACE_EVENT0("cobalt::storage", __FUNCTION__); |
| DCHECK(sql_message_loop_->BelongsToCurrentThread()); |
| DCHECK(schema_version); |
| |
| if (!SqlQueryTableExists(sql_connection(), table_name)) { |
| return false; |
| } |
| |
| int found_version = SqlQuerySchemaVersion(sql_connection(), table_name); |
| if (found_version != -1) { |
| *schema_version = found_version; |
| } else if (loaded_database_version_ != StorageManager::kDatabaseUserVersion) { |
| // The schema table did not exist before this session, which is different |
| // from the schema table being lost. |
| *schema_version = StorageManager::kSchemaTableIsNew; |
| } else { |
| *schema_version = StorageManager::kSchemaVersionLost; |
| } |
| return true; |
| } |
| |
| void StorageManager::UpdateSchemaVersion(const char* table_name, int version) { |
| TRACE_EVENT0("cobalt::storage", __FUNCTION__); |
| DCHECK(sql_message_loop_->BelongsToCurrentThread()); |
| DCHECK_GT(version, 0) << "Schema version numbers must be positive."; |
| |
| SqlUpdateSchemaVersion(sql_connection(), table_name, version); |
| } |
| |
| sql::Connection* StorageManager::sql_connection() { |
| TRACE_EVENT0("cobalt::storage", __FUNCTION__); |
| FinishInit(); |
| return connection_.get(); |
| } |
| |
| void StorageManager::FinishInit() { |
| TRACE_EVENT0("cobalt::storage", __FUNCTION__); |
| DCHECK(sql_message_loop_->BelongsToCurrentThread()); |
| if (initialized_) { |
| return; |
| } |
| |
| vfs_.reset(new VirtualFileSystem()); |
| sql_vfs_.reset(new SqlVfs("cobalt_vfs", vfs_.get())); |
| flush_timer_.reset(new base::OneShotTimer<StorageManager>()); |
| // Savegame has finished loading. Now initialize the database connection. |
| // Check if the savegame data contains a VFS header. |
| // If so, proceed to deserialize it. |
| // If not, load the file into the VFS directly. |
| scoped_ptr<Savegame::ByteVector> loaded_raw_bytes = |
| savegame_thread_->GetLoadedRawBytes(); |
| DCHECK(loaded_raw_bytes); |
| Savegame::ByteVector& raw_bytes = *loaded_raw_bytes; |
| VirtualFileSystem::SerializedHeader header = {}; |
| if (raw_bytes.size() > 0) { |
| if (raw_bytes.size() >= sizeof(VirtualFileSystem::SerializedHeader)) { |
| memcpy(&header, &raw_bytes[0], |
| sizeof(VirtualFileSystem::SerializedHeader)); |
| } |
| |
| if (VirtualFileSystem::GetHeaderVersion(header) == -1) { |
| VirtualFile* vf = vfs_->Open(kDefaultSaveFile); |
| vf->Write(&raw_bytes[0], static_cast<int>(raw_bytes.size()), |
| 0 /* offset */); |
| } else { |
| vfs_->Deserialize(&raw_bytes[0], static_cast<int>(raw_bytes.size())); |
| } |
| } |
| |
| std::vector<std::string> filenames = vfs_->ListFiles(); |
| if (filenames.size() == 0) { |
| filenames.push_back(kDefaultSaveFile); |
| } |
| |
| // Legacy Steel save data may contain multiple files (e.g. db-journal as well |
| // as db), so use the first one that looks like a valid database file. |
| const std::string& save_name = GetFirstValidDatabaseFile(filenames); |
| bool ok = connection_->Open(FilePath(save_name)); |
| DCHECK(ok); |
| |
| // Open() is lazy. Run a quick check to see if the database is valid. |
| int err = connection_->ExecuteAndReturnErrorCode("pragma schema_version;"); |
| if (err != SQLITE_OK) { |
| // Database seems to be invalid. |
| DLOG(WARNING) << "Database " << save_name << " appears to be corrupt."; |
| // Try to start again. Delete the file in the VFS and make a |
| // new connection. |
| vfs_->Delete(save_name); |
| vfs_->Open(save_name); |
| connection_.reset(new sql::Connection()); |
| ok = connection_->Open(FilePath(save_name)); |
| DCHECK(ok); |
| err = connection_->ExecuteAndReturnErrorCode("pragma schema_version;"); |
| DCHECK_EQ(SQLITE_OK, err); |
| } |
| |
| // Configure our SQLite database now that it's open. |
| SqlDisableJournal(connection_.get()); |
| loaded_database_version_ = SqlQueryUserVersion(connection_.get()); |
| SqlCreateSchemaTable(connection_.get()); |
| SqlUpdateDatabaseUserVersion(connection_.get()); |
| |
| initialized_ = true; |
| } |
| |
| void StorageManager::OnFlushTimerFired() { |
| TRACE_EVENT0("cobalt::storage", __FUNCTION__); |
| DCHECK(sql_message_loop_->BelongsToCurrentThread()); |
| |
| QueueFlush(base::Closure()); |
| } |
| |
| void StorageManager::OnFlushIOCompletedSQLCallback() { |
| TRACE_EVENT0("cobalt::storage", __FUNCTION__); |
| // Make sure this runs on the SQL message loop. |
| if (MessageLoop::current()->message_loop_proxy() != sql_message_loop_) { |
| sql_message_loop_->PostTask( |
| FROM_HERE, base::Bind(&StorageManager::OnFlushIOCompletedSQLCallback, |
| base::Unretained(this))); |
| return; |
| } |
| |
| flush_processing_ = false; |
| |
| // Fire all the callbacks waiting on the current flush to complete. |
| for (size_t i = 0; i < flush_processing_callbacks_.size(); ++i) { |
| DCHECK(!flush_processing_callbacks_[i].is_null()); |
| flush_processing_callbacks_[i].Run(); |
| } |
| flush_processing_callbacks_.clear(); |
| |
| if (flush_requested_) { |
| // If another flush has been requested while we were processing the one that |
| // just completed, start that next flush now. |
| flush_processing_callbacks_.swap(flush_requested_callbacks_); |
| flush_requested_ = false; |
| FlushInternal(); |
| DCHECK(flush_processing_); |
| } else { |
| no_flushes_pending_.Signal(); |
| } |
| } |
| |
| void StorageManager::QueueFlush(const base::Closure& callback) { |
| TRACE_EVENT0("cobalt::storage", __FUNCTION__); |
| DCHECK(sql_message_loop_->BelongsToCurrentThread()); |
| |
| if (!flush_processing_) { |
| // If no flush is currently in progress, flush immediately. |
| if (!callback.is_null()) { |
| flush_processing_callbacks_.push_back(callback); |
| } |
| FlushInternal(); |
| DCHECK(flush_processing_); |
| } else { |
| // Otherwise, indicate that we would like to re-flush as soon as the |
| // current one completes. |
| flush_requested_ = true; |
| if (!callback.is_null()) { |
| flush_requested_callbacks_.push_back(callback); |
| } |
| } |
| } |
| |
| void StorageManager::FlushInternal() { |
| TRACE_EVENT0("cobalt::storage", __FUNCTION__); |
| DCHECK(sql_message_loop_->BelongsToCurrentThread()); |
| FinishInit(); |
| |
| flush_processing_ = true; |
| no_flushes_pending_.Reset(); |
| |
| // Serialize the database into a buffer. Then send the bytes |
| // to OnFlushIO for a blocking write to the savegame. |
| scoped_ptr<Savegame::ByteVector> raw_bytes_ptr; |
| int size = vfs_->Serialize(NULL, true /*dry_run*/); |
| raw_bytes_ptr.reset(new Savegame::ByteVector(static_cast<size_t>(size))); |
| if (size > 0) { |
| Savegame::ByteVector& raw_bytes = *raw_bytes_ptr; |
| vfs_->Serialize(&raw_bytes[0], false /*dry_run*/); |
| } |
| |
| // Send the savegame bytes off to the SavegameThread object to be |
| // asynchronously written to the savegame file. |
| savegame_thread_->Flush( |
| raw_bytes_ptr.Pass(), |
| base::Bind(&StorageManager::OnFlushIOCompletedSQLCallback, |
| base::Unretained(this))); |
| } |
| |
| void StorageManager::FinishIO() { |
| TRACE_EVENT0("cobalt::storage", __FUNCTION__); |
| DCHECK(!sql_message_loop_->BelongsToCurrentThread()); |
| |
| // The SQL thread may be communicating with the savegame I/O thread still, |
| // flushing all pending updates. This process can require back and forth |
| // communication. This method exists to wait for that communication to |
| // finish and for all pending flushes to complete. |
| |
| // Start by finishing all commands currently in the sql message loop queue. |
| // This method is called by the destructor, so the only new tasks posted |
| // after this one will be generated internally. We need to do this because |
| // it is possible that there are no flushes pending at this instant, but there |
| // are tasks queued on |sql_message_loop_| that will begin a flush, and so |
| // we make sure that these are executed first. |
| base::WaitableEvent current_queue_finished_event_(true, false); |
| sql_message_loop_->PostTask( |
| FROM_HERE, |
| base::Bind(&base::WaitableEvent::Signal, |
| base::Unretained(¤t_queue_finished_event_))); |
| current_queue_finished_event_.Wait(); |
| |
| // Now wait for all pending flushes to wrap themselves up. This may involve |
| // the savegame I/O thread and the SQL thread posting tasks to each other. |
| no_flushes_pending_.Wait(); |
| } |
| |
| void StorageManager::OnDestroy() { |
| TRACE_EVENT0("cobalt::storage", __FUNCTION__); |
| DCHECK(sql_message_loop_->BelongsToCurrentThread()); |
| |
| // Stop the savegame thread and have it wrap up any pending I/O operations. |
| savegame_thread_.reset(); |
| |
| // Ensure these objects are destroyed on the proper thread. |
| flush_timer_.reset(NULL); |
| sql_vfs_.reset(NULL); |
| vfs_.reset(NULL); |
| } |
| |
| } // namespace storage |
| } // namespace cobalt |