Skip to content

Commit

Permalink
[RocksDB] Use raw pointer instead of shared pointer when passing Stat…
Browse files Browse the repository at this point in the history
…istics object internally

Summary: liveness of the statistics object is already ensured by the shared pointer in DB options. There's no reason to pass again shared pointer among internal functions. Raw pointer is sufficient and efficient.

Test Plan: make check

Reviewers: dhruba, MarkCallaghan, igor

Reviewed By: dhruba

CC: leveldb, reconnect.grayhat

Differential Revision: https://reviews.facebook.net/D14289
  • Loading branch information
haoboxu committed Nov 25, 2013
1 parent 0c93df9 commit 5b825d6
Show file tree
Hide file tree
Showing 14 changed files with 117 additions and 90 deletions.
5 changes: 3 additions & 2 deletions db/builder.cc
Expand Up @@ -112,6 +112,7 @@ Status BuildTable(const std::string& dbname,

if (this_ikey.type == kTypeMerge) {
// Handle merge-type keys using the MergeHelper
// TODO: pass statistics to MergeUntil
merge.MergeUntil(iter, 0 /* don't worry about snapshot */);
iterator_at_next = true;
if (merge.IsSuccess()) {
Expand Down Expand Up @@ -188,10 +189,10 @@ Status BuildTable(const std::string& dbname,
// Finish and check for file errors
if (s.ok() && !options.disableDataSync) {
if (options.use_fsync) {
StopWatch sw(env, options.statistics, TABLE_SYNC_MICROS);
StopWatch sw(env, options.statistics.get(), TABLE_SYNC_MICROS);
s = file->Fsync();
} else {
StopWatch sw(env, options.statistics, TABLE_SYNC_MICROS);
StopWatch sw(env, options.statistics.get(), TABLE_SYNC_MICROS);
s = file->Sync();
}
}
Expand Down
75 changes: 42 additions & 33 deletions db/db_impl.cc
Expand Up @@ -404,7 +404,7 @@ const Status DBImpl::CreateArchivalDirectory() {
}

void DBImpl::PrintStatistics() {
auto dbstats = options_.statistics;
auto dbstats = options_.statistics.get();
if (dbstats) {
Log(options_.info_log,
"STATISTCS:\n %s",
Expand Down Expand Up @@ -860,7 +860,7 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}
SetTickerCount(options_.statistics, SEQUENCE_NUMBER,
SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER,
versions_->LastSequence());
}
}
Expand Down Expand Up @@ -1297,7 +1297,7 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const {
Status DBImpl::GetUpdatesSince(SequenceNumber seq,
unique_ptr<TransactionLogIterator>* iter) {

RecordTick(options_.statistics, GET_UPDATES_SINCE_CALLS);
RecordTick(options_.statistics.get(), GET_UPDATES_SINCE_CALLS);
if (seq > versions_->LastSequence()) {
return Status::IOError("Requested sequence not yet written in the db");
}
Expand Down Expand Up @@ -1971,10 +1971,12 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
// Finish and check for file errors
if (s.ok() && !options_.disableDataSync) {
if (options_.use_fsync) {
StopWatch sw(env_, options_.statistics, COMPACTION_OUTFILE_SYNC_MICROS);
StopWatch sw(env_, options_.statistics.get(),
COMPACTION_OUTFILE_SYNC_MICROS);
s = compact->outfile->Fsync();
} else {
StopWatch sw(env_, options_.statistics, COMPACTION_OUTFILE_SYNC_MICROS);
StopWatch sw(env_, options_.statistics.get(),
COMPACTION_OUTFILE_SYNC_MICROS);
s = compact->outfile->Sync();
}
}
Expand Down Expand Up @@ -2212,7 +2214,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
ParseInternalKey(key, &ikey);
// no value associated with delete
value.clear();
RecordTick(options_.statistics, COMPACTION_KEY_DROP_USER);
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_USER);
} else if (value_changed) {
value = compaction_filter_value;
}
Expand All @@ -2238,7 +2240,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// TODO: why not > ?
assert(last_sequence_for_key >= ikey.sequence);
drop = true; // (A)
RecordTick(options_.statistics, COMPACTION_KEY_DROP_NEWER_ENTRY);
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_NEWER_ENTRY);
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= earliest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
Expand All @@ -2250,7 +2252,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
RecordTick(options_.statistics, COMPACTION_KEY_DROP_OBSOLETE);
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_OBSOLETE);
} else if (ikey.type == kTypeMerge) {
// We know the merge type entry is not hidden, otherwise we would
// have hit (A)
Expand All @@ -2259,7 +2261,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// logic could also be nicely re-used for memtable flush purge
// optimization in BuildTable.
merge.MergeUntil(input.get(), prev_snapshot, bottommost_level,
options_.statistics);
options_.statistics.get());
current_entry_is_merging = true;
if (merge.IsSuccess()) {
// Successfully found Put/Delete/(end-of-key-range) while merging
Expand Down Expand Up @@ -2412,8 +2414,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,

CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros - imm_micros;
if (options_.statistics) {
options_.statistics->measureTime(COMPACTION_TIME, stats.micros);
if (options_.statistics.get()) {
options_.statistics.get()->measureTime(COMPACTION_TIME, stats.micros);
}
stats.files_in_leveln = compact->compaction->num_input_files(0);
stats.files_in_levelnp1 = compact->compaction->num_input_files(1);
Expand Down Expand Up @@ -2554,7 +2556,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
bool* value_found) {
Status s;

StopWatch sw(env_, options_.statistics, DB_GET);
StopWatch sw(env_, options_.statistics.get(), DB_GET);
SequenceNumber snapshot;
mutex_.Lock();
if (options.snapshot != nullptr) {
Expand Down Expand Up @@ -2605,16 +2607,16 @@ Status DBImpl::GetImpl(const ReadOptions& options,

LogFlush(options_.info_log);
// Note, tickers are atomic now - no lock protection needed any more.
RecordTick(options_.statistics, NUMBER_KEYS_READ);
RecordTick(options_.statistics, BYTES_READ, value->size());
RecordTick(options_.statistics.get(), NUMBER_KEYS_READ);
RecordTick(options_.statistics.get(), BYTES_READ, value->size());
return s;
}

std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
const std::vector<Slice>& keys,
std::vector<std::string>* values) {

StopWatch sw(env_, options_.statistics, DB_MULTIGET);
StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET);
SequenceNumber snapshot;
mutex_.Lock();
if (options.snapshot != nullptr) {
Expand Down Expand Up @@ -2683,9 +2685,9 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
mutex_.Unlock();

LogFlush(options_.info_log);
RecordTick(options_.statistics, NUMBER_MULTIGET_CALLS);
RecordTick(options_.statistics, NUMBER_MULTIGET_KEYS_READ, numKeys);
RecordTick(options_.statistics, NUMBER_MULTIGET_BYTES_READ, bytesRead);
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_CALLS);
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_KEYS_READ, numKeys);
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_BYTES_READ, bytesRead);

return statList;
}
Expand Down Expand Up @@ -2760,7 +2762,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
w.disableWAL = options.disableWAL;
w.done = false;

StopWatch sw(env_, options_.statistics, DB_WRITE);
StopWatch sw(env_, options_.statistics.get(), DB_WRITE);
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
Expand Down Expand Up @@ -2793,8 +2795,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
int my_batch_count = WriteBatchInternal::Count(updates);
last_sequence += my_batch_count;
// Record statistics
RecordTick(options_.statistics, NUMBER_KEYS_WRITTEN, my_batch_count);
RecordTick(options_.statistics,
RecordTick(options_.statistics.get(),
NUMBER_KEYS_WRITTEN, my_batch_count);
RecordTick(options_.statistics.get(),
BYTES_WRITTEN,
WriteBatchInternal::ByteSize(updates));
if (options.disableWAL) {
Expand All @@ -2808,10 +2811,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
BumpPerfTime(&perf_context.wal_write_time, &timer);
if (status.ok() && options.sync) {
if (options_.use_fsync) {
StopWatch(env_, options_.statistics, WAL_FILE_SYNC_MICROS);
StopWatch(env_, options_.statistics.get(), WAL_FILE_SYNC_MICROS);
status = log_->file()->Fsync();
} else {
StopWatch(env_, options_.statistics, WAL_FILE_SYNC_MICROS);
StopWatch(env_, options_.statistics.get(), WAL_FILE_SYNC_MICROS);
status = log_->file()->Sync();
}
}
Expand All @@ -2826,7 +2829,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// have succeeded in memtable but Status reports error for all writes.
throw std::runtime_error("In memory WriteBatch corruption!");
}
SetTickerCount(options_.statistics, SEQUENCE_NUMBER, last_sequence);
SetTickerCount(options_.statistics.get(),
SEQUENCE_NUMBER, last_sequence);
}
LogFlush(options_.info_log);
mutex_.Lock();
Expand Down Expand Up @@ -2975,15 +2979,15 @@ Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.Unlock();
uint64_t delayed;
{
StopWatch sw(env_, options_.statistics, STALL_L0_SLOWDOWN_COUNT);
StopWatch sw(env_, options_.statistics.get(), STALL_L0_SLOWDOWN_COUNT);
env_->SleepForMicroseconds(
SlowdownAmount(versions_->NumLevelFiles(0),
options_.level0_slowdown_writes_trigger,
options_.level0_stop_writes_trigger)
);
delayed = sw.ElapsedMicros();
}
RecordTick(options_.statistics, STALL_L0_SLOWDOWN_MICROS, delayed);
RecordTick(options_.statistics.get(), STALL_L0_SLOWDOWN_MICROS, delayed);
stall_level0_slowdown_ += delayed;
stall_level0_slowdown_count_++;
allow_delay = false; // Do not delay a single write more than once
Expand All @@ -3003,12 +3007,13 @@ Status DBImpl::MakeRoomForWrite(bool force) {
Log(options_.info_log, "wait for memtable compaction...\n");
uint64_t stall;
{
StopWatch sw(env_, options_.statistics,
StopWatch sw(env_, options_.statistics.get(),
STALL_MEMTABLE_COMPACTION_COUNT);
bg_cv_.Wait();
stall = sw.ElapsedMicros();
}
RecordTick(options_.statistics, STALL_MEMTABLE_COMPACTION_MICROS, stall);
RecordTick(options_.statistics.get(),
STALL_MEMTABLE_COMPACTION_MICROS, stall);
stall_memtable_compaction_ += stall;
stall_memtable_compaction_count_++;
} else if (versions_->NumLevelFiles(0) >=
Expand All @@ -3018,11 +3023,12 @@ Status DBImpl::MakeRoomForWrite(bool force) {
Log(options_.info_log, "wait for fewer level0 files...\n");
uint64_t stall;
{
StopWatch sw(env_, options_.statistics, STALL_L0_NUM_FILES_COUNT);
StopWatch sw(env_, options_.statistics.get(),
STALL_L0_NUM_FILES_COUNT);
bg_cv_.Wait();
stall = sw.ElapsedMicros();
}
RecordTick(options_.statistics, STALL_L0_NUM_FILES_MICROS, stall);
RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall);
stall_level0_num_files_ += stall;
stall_level0_num_files_count_++;
} else if (
Expand All @@ -3034,7 +3040,8 @@ Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.Unlock();
uint64_t delayed;
{
StopWatch sw(env_, options_.statistics, HARD_RATE_LIMIT_DELAY_COUNT);
StopWatch sw(env_, options_.statistics.get(),
HARD_RATE_LIMIT_DELAY_COUNT);
env_->SleepForMicroseconds(1000);
delayed = sw.ElapsedMicros();
}
Expand All @@ -3043,7 +3050,8 @@ Status DBImpl::MakeRoomForWrite(bool force) {
// Make sure the following value doesn't round to zero.
uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1);
rate_limit_delay_millis += rate_limit;
RecordTick(options_.statistics, RATE_LIMIT_DELAY_MILLIS, rate_limit);
RecordTick(options_.statistics.get(),
RATE_LIMIT_DELAY_MILLIS, rate_limit);
if (options_.rate_limit_delay_max_milliseconds > 0 &&
rate_limit_delay_millis >=
(unsigned)options_.rate_limit_delay_max_milliseconds) {
Expand All @@ -3058,7 +3066,8 @@ Status DBImpl::MakeRoomForWrite(bool force) {
// TODO: add statistics
mutex_.Unlock();
{
StopWatch sw(env_, options_.statistics, SOFT_RATE_LIMIT_DELAY_COUNT);
StopWatch sw(env_, options_.statistics.get(),
SOFT_RATE_LIMIT_DELAY_COUNT);
env_->SleepForMicroseconds(SlowdownAmount(
score,
options_.soft_rate_limit,
Expand Down
4 changes: 2 additions & 2 deletions db/db_iter.cc
Expand Up @@ -69,7 +69,7 @@ class DBIter: public Iterator {
direction_(kForward),
valid_(false),
current_entry_is_merged_(false),
statistics_(options.statistics) {
statistics_(options.statistics.get()) {
RecordTick(statistics_, NO_ITERATORS, 1);
max_skip_ = options.max_sequential_skip_in_iterations;
}
Expand Down Expand Up @@ -135,7 +135,7 @@ class DBIter: public Iterator {
Direction direction_;
bool valid_;
bool current_entry_is_merged_;
std::shared_ptr<Statistics> statistics_;
Statistics* statistics_;
uint64_t max_skip_;

// No copying allowed
Expand Down
5 changes: 3 additions & 2 deletions db/memtable.cc
Expand Up @@ -19,6 +19,7 @@
#include "util/coding.h"
#include "util/mutexlock.h"
#include "util/murmurhash.h"
#include "util/statistics_imp.h"

namespace std {
template <>
Expand Down Expand Up @@ -203,7 +204,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
assert(merge_operator);
if (!merge_operator->FullMerge(key.user_key(), &v, *operands,
value, logger.get())) {
RecordTick(options.statistics, NUMBER_MERGE_FAILURES);
RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES);
*s = Status::Corruption("Error: Could not perform merge.");
}
} else {
Expand All @@ -220,7 +221,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
*s = Status::OK();
if (!merge_operator->FullMerge(key.user_key(), nullptr, *operands,
value, logger.get())) {
RecordTick(options.statistics, NUMBER_MERGE_FAILURES);
RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES);
*s = Status::Corruption("Error: Could not perform merge.");
}
} else {
Expand Down
3 changes: 2 additions & 1 deletion db/merge_helper.cc
Expand Up @@ -8,6 +8,7 @@
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/merge_operator.h"
#include "util/statistics_imp.h"
#include <string>
#include <stdio.h>

Expand All @@ -20,7 +21,7 @@ namespace rocksdb {
// operands_ stores the list of merge operands encountered while merging.
// keys_[i] corresponds to operands_[i] for each i.
void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
bool at_bottom, shared_ptr<Statistics> stats) {
bool at_bottom, Statistics* stats) {
// Get a copy of the internal key, before it's invalidated by iter->Next()
// Also maintain the list of merge operands seen.
keys_.clear();
Expand Down
4 changes: 2 additions & 2 deletions db/merge_helper.h
Expand Up @@ -8,7 +8,6 @@

#include "db/dbformat.h"
#include "rocksdb/slice.h"
#include "rocksdb/statistics.h"
#include <string>
#include <deque>

Expand All @@ -18,6 +17,7 @@ class Comparator;
class Iterator;
class Logger;
class MergeOperator;
class Statistics;

class MergeHelper {
public:
Expand Down Expand Up @@ -46,7 +46,7 @@ class MergeHelper {
// at_bottom: (IN) true if the iterator covers the bottem level, which means
// we could reach the start of the history of this user key.
void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0,
bool at_bottom = false, shared_ptr<Statistics> stats=nullptr);
bool at_bottom = false, Statistics* stats = nullptr);

// Query the merge result
// These are valid until the next MergeUntil call
Expand Down
6 changes: 3 additions & 3 deletions db/table_cache.cc
Expand Up @@ -65,20 +65,20 @@ Status TableCache::FindTable(const EnvOptions& toptions,
unique_ptr<RandomAccessFile> file;
unique_ptr<TableReader> table_reader;
s = env_->NewRandomAccessFile(fname, &file, toptions);
RecordTick(options_->statistics, NO_FILE_OPENS);
RecordTick(options_->statistics.get(), NO_FILE_OPENS);
if (s.ok()) {
if (options_->advise_random_on_open) {
file->Hint(RandomAccessFile::RANDOM);
}
StopWatch sw(env_, options_->statistics, TABLE_OPEN_IO_MICROS);
StopWatch sw(env_, options_->statistics.get(), TABLE_OPEN_IO_MICROS);
s = options_->table_factory->GetTableReader(*options_, toptions,
std::move(file), file_size,
&table_reader);
}

if (!s.ok()) {
assert(table_reader == nullptr);
RecordTick(options_->statistics, NO_FILE_ERRORS);
RecordTick(options_->statistics.get(), NO_FILE_ERRORS);
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
} else {
Expand Down

0 comments on commit 5b825d6

Please sign in to comment.