leveldb - env


env.h

单看这个头文件,没什么特别的,也只是一些接口而已,但是把codingenv这两个的头文件比较就会发现都是这个结构

  • 接口类
  • 包装类

但是,env.h要复杂些,因为这里面除了FileLockEnvWrapper以外,剩下的全是只有接口的虚基类
此外,除了env.cc外,在支持POSIX的平台上还有env_posix.cc需要看看。

env_posix.cc

打开文件,看到如下代码

namespace leveldb {

namespace {

static Status IOError(const std::string& context, int err_number) {
  return Status::IOError(context, strerror(err_number));
}
// something omitted
}

namespace leveldb中又来一个匿名命名空间,而在其省略部分还包含有一个匿名命名空间,看起来是这样

namespace leveldb {
  namespace {
    ...
    namespace {
      ...
    }
  }
  ...
}

这里面又有多个static修饰的函数,戳到痛点,赶紧Google了下,这坑待填

接着看,Status::IOError也是个static修饰的成员函数,第一个参数是错误的上下文(这里通常是文件名),第二个参数是错误描述。

顺序读
class PosixSequentialFile: public SequentialFile { // something omitted };

实现虚方法,值得一提的是其中Read函数中使用了一个fread_unlocked函数,刚开始还以为是作者的一个define (#define fread fread_unlocked),后来发现这是一个GNU的扩展函数,并且不推荐使用。。。

随机读

RandomAccessFile的一个派生类

// pread() based random-access
class PosixRandomAccessFile: public RandomAccessFile {
 private:
  std::string filename_;
  int fd_;

 public:
  PosixRandomAccessFile(const std::string& fname, int fd)
      : filename_(fname), fd_(fd) { }
  virtual ~PosixRandomAccessFile() { close(fd_); }

  virtual Status Read(uint64_t offset, size_t n, Slice* result,
                      char* scratch) const {
    Status s;
    ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
    *result = Slice(scratch, (r < 0) ? 0 : r);
    if (r < 0) {
      // An error: return a non-ok status
      s = IOError(filename_, errno);
    }
    return s;
  }
};

没什么好解释的,就是用pread实现的。

内存映射限制器

全程有互斥锁和原子变量(或memory barrier)保护

// Up to 1000 mmaps for 64-bit binaries; none for smaller pointer sizes.
MmapLimiter() {
  SetAllowed(sizeof(void*) >= 8 ? 1000 : 0);
}

为64位或更大位宽的机器设置上限,否则不使用不使用mmap而是使用上面由pread实现的PosixRandomAccessFile (在Env的派生类中)。这个类中的私有成员port::AtomicPointer allowed_是限制器的计数,当这个计数达到上限后,无论指针宽度是多少都将使用pread方式。

基于mmap的随机读

这是RandomAccessFile的另一个派生类,实现同样的Read接口,通过

*result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);

增加偏移量+强制转换来获得文件映射到虚拟内存的中的数据,其长度为n。

写文件
class PosixWritableFile : public WritableFile {
 private:
  std::string filename_;
  FILE* file_;
}

私有成员变量接受一个由fopen返回的文件指针,并在析构函数中将其关闭。同样这里面值得一体的是fwrite_unlockedfflush_unlocked以及两个文件同步函数fsyncfdatasync

辅助函数LockOrUnlock

一个由fcntl操纵的文件锁开关

文件锁集合
// Set of locked files.  We keep a separate set instead of just
// relying on fcntrl(F_SETLK) since fcntl(F_SETLK) does not provide
// any protection against multiple uses from the same process.
class PosixLockTable {
 private:
  port::Mutex mu_;
  std::set<std::string> locked_files_;
 public:
  bool Insert(const std::string& fname) {
    MutexLock l(&mu_);
    return locked_files_.insert(fname).second;
  }
  void Remove(const std::string& fname) {
    MutexLock l(&mu_);
    locked_files_.erase(fname);
  }
}

值得一提的是作者使用std::set来作为存储容器,并且Insert的返回值可以指示是否插入成功(denoting whether the insertion took place)。

Env的POSIX实现

class PosxiEnv : public Env实现了基类的接口,这些接口大都是由上面提到的实现来实现的,主要包括有简单的文件读写、创建和删除以及目录的读取创建和删除和文件加锁及解锁,这些都很简单,所以忽略掉。下面来看看这个派生类独有的线程相关的几个函数。
辅助函数PthreadCall

void PthreadCall(const char* label, int result) {
  if (result != 0) {
    fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
    abort();
  }
}

第一个参数是消息类型,后一个参数来源于系统调用的返回值,在strerror中返回对应的内容。这个函数用来简化错误处理。
辅助函数BGThreadWrapper

static void* BGThreadWrapper(void* arg) {
  reinterpret_cast<PosixEnv*>(arg)->BGThread();
  return NULL;
}

执行线程。
消费线程函数BGThread

void PosixEnv::BGThread() {
  while (true) {
    // Wait until there is an item that is ready to run
    PthreadCall("lock", pthread_mutex_lock(&mu_));
    while (queue_.empty()) {
      PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
    }

    void (*function)(void*) = queue_.front().function;
    void* arg = queue_.front().arg;
    queue_.pop_front();

    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
    (*function)(arg);
  }
}

假如一切顺利,这个函数获得了锁,并且队列不为空,那么这个函数将使用队列中存放的参数以及回调函数指针来调用回调函数,然后回去再次等待获取锁。从这里可以看出,此函数是一个线程函数(消费者),它负责消耗队列中传来的数据struct BGItem { void* arg; void (*function)(void*); }; 。既然有消费者那么就会有生产者了。
生产线程函数Schedule

void PosixEnv::Schedule(void (*function)(void*), void* arg) {
  PthreadCall("lock", pthread_mutex_lock(&mu_));

  // Start background thread if necessary
  if (!started_bgthread_) {
    started_bgthread_ = true;
    PthreadCall(
        "create thread",
        pthread_create(&bgthread_, NULL,  &PosixEnv::BGThreadWrapper, this));
  }

  // If the queue is currently empty, the background thread may currently be
  // waiting.
  if (queue_.empty()) {
    PthreadCall("signal", pthread_cond_signal(&bgsignal_));
  }

  // Add to priority queue
  queue_.push_back(BGItem());
  queue_.back().function = function;
  queue_.back().arg = arg;

  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}

这个函数做两件事,如果没有消费者,则创建一个消费线程,然后在队列中加入任务,否则,直接在队列中加入任务,整个过程由互斥量保护。注意,用的是队列哦,先进先出!
额外的线程函数

namespace {
struct StartThreadState {
  void (*user_function)(void*);
  void* arg;
};
}
static void* StartThreadWrapper(void* arg) {
  StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
  state->user_function(state->arg);
  delete state;
  return NULL;
}

void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
  pthread_t t;
  StartThreadState* state = new StartThreadState;
  state->user_function = function;
  state->arg = arg;
  PthreadCall("start thread",
              pthread_create(&t, NULL,  &StartThreadWrapper, state));
}

StartThread是一个共有成员函数,而匿名命名空间中的结构一个StartThreadWrapper则是用于辅助StartThread的,目前来看,只能将StartThread看作是方便临时的执行一些异步操作的函数。

最后是基类中的Default的实现

static pthread_once_t once = PTHREAD_ONCE_INIT;
static Env* default_env;
static void InitDefaultEnv() { default_env = new PosixEnv; }

Env* Env::Default() {
  pthread_once(&once, InitDefaultEnv);
  return default_env;
}

作者选择使用静态变量+pthread_once保证了只有一个env(InitDefaultEnv只会调用一次),然而,在积累class Env的析构函数中什么也没做,在PosixEnv的析构函数中确实非常变态的

virtual ~PosixEnv() {
  char msg[] = "Destroying Env::Default()\n";
  fwrite(msg, 1, sizeof(msg), stderr);
  abort();
}

析构函数中放个abort()是什么鬼啊喂?!那么这个default_env的内存什么时候释放呢?在哪儿释放呢?泄漏了?现在都不知道!

到这里,__env_posix.cc__还剩下一个没有提到,那就是日志。
相关的文件是posix_logger.h这里面是一个Logger的派生类及其实现,在实现里面借助vsnprintf来处理的可变参数。在PosixEnv中只是借助一下函数来产生一个Logger *对象

virtual Status NewLogger(const std::string& fname, Logger** result) {
  FILE* f = fopen(fname.c_str(), "w");
  if (f == NULL) {
    *result = NULL;
    return IOError(fname, errno);
  } else {
    *result = new PosixLogger(f, &PosixEnv::gettid);
    return Status::OK();
  }
}

再一次看到基类指针指向派生类!当然作者这么做估计有两个理由,一是方便跨平台,二是统一接口方便管理。
然后真正记录日志的是在__env.h__及__env.cc__中的

// env.h
// Log the specified data to *info_log if info_log is non-NULL.
extern void Log(Logger* info_log, const char* format, ...)
#   if defined(__GNUC__) || defined(__clang__)
    __attribute__((__format__ (__printf__, 2, 3)))
#   endif
    ;
// env.cc
void Log(Logger* info_log, const char* format, ...) {
  if (info_log != NULL) {
    va_list ap;
    va_start(ap, format);
    info_log->Logv(format, ap);
    va_end(ap);
  }
}

把这个函数拿出来说是有原因的!从来没看到过的__attribute__总述具体到这里的使用
gcc_attribute
为了理解图片中的这段话以及作者的这个函数,我写了个例子
arg.c

#include "arg.h"
#include <stdarg.h>

int myprintf(FILE *out, const char *fmt, ...)
{
    va_list ap;
    va_start(ap, fmt);
    vfprintf(out, fmt, ap);
    va_end(ap);
    return 0;
}

int main()
{
    myprintf(stdout, "%s", 1);
}

没有使用__attribute__的情况
no_attribute
使用__attribute__的情况
have_attribute
可以看出,作者使用__attribute__是为了让编译器来检查可变参数类型和格式化字符串是否匹配,以防止潜在的错误。(注:运行上面的例子将会得到segmentation fault)

Env辅助类EnvWrapper

和大多数辅助类一样,它的私有成员是一个基类的指针,共有成员就是基类的接口。不同的是,这个类继承了基类Env,关于这点,作者的解释是

// An implementation of Env that forwards all calls to another Env.
// May be useful to clients who wish to override just part of the functionality of another Env.

具体在哪儿用呢? 在memenv.cc中。

memenv

2016-04-06 19:23 更新

从源码看到,又是一个匿名命名空间!通过

Env* NewMemEnv(Env* base_env) {
  return new InMemoryEnv(base_env);
}

class InMemoryEnv暴露出来。下面就来看看这个匿名命名空间里面的几个东西。

FileState
在这个类里面有之前不知道的东西。

private:
// Private since only Unref() should be used to delete it.
~FileState() {
  for (std::vector<char*>::iterator i = blocks_.begin(); i != blocks_.end();
       ++i) {
  delete [] *i;
  }
}

它的析构函数是私有的!这意味着

  • 无法在栈上创建FileState
  • 在堆上创建的FileState对象不能用delete释放其占用的内存
  • 这个类不能被继承

那么,这里的析构函数什么时候会被调用呢?看下面这个简短的例子

#include <iostream>

class T {
  ~T() { std::cout << "x\n"; }
  public:
    T() {}
    void destroy() { delete this; }
};

int main() {
  T *t = new T;
  //t->destroy();
}

运行是会发现没有任何输出,并且丢失了1bytes的内存,这说明析构函数在对象离开作用的时候并没有被调用!
然而去掉代码中的注释后,再次运行就会发现析构函数被调用了,这发生在删除对象时。

回过头来看,作者还写了一个Unref函数来做上面destory相同的工作

// Decrease the reference count. Delete if this is the last reference.
void Unref() {
  bool do_delete = false;
  {
    MutexLock lock(&refs_mutex_);
    --refs_;
    assert(refs_ >= 0);
    if (refs_ <= 0) {
      do_delete = true;
    }
  }
  if (do_delete) {
    delete this;
  }
}

然而,又有了另外个问题:对象被删除后,对象里面的东西还在吗?
动手试一试,看下面的代码

#include <iostream>

using namespace std;

class T
{
  int *n;
  int tmp;
  ~T() { 
    cout << "delete n (n = " << *n << ")\n"; 
    delete n; 
  }
  public:
    T(): tmp(1) {}
    void destroy() { 
      cout << "start\n";
      delete this; 
      cout << *n << "\n";
      cout << "end\n";
    }
    void assign(int x) { n = new int(x); }
    friend ostream& operator<< (ostream &os, const T& t) {
      os << t.tmp;
      return os;
    }
};

int main()
{
  T *t = new T;
  t->assign(10);
  t->destroy();
  cout << *t << "\n";
  cout << "done\n";
}

以及运行输出

./a.out
start
delete n (n = 10)
0
end
1
done

可以看到n在被delete后就消失了,而tmp在对象被删除后仍然存在。但是,当使用valgrind检查内存问题会发现

valgrind --leak-check=full --show-leak-kinds=all ./a.out 
==4741== Memcheck, a memory error detector
==4741== Copyright (C) 2002-2015, and GNU GPL'd, by Julian Seward et al.
==4741== Using Valgrind-3.11.0 and LibVEX; rerun with -h for copyright info
==4741== Command: ./a.out
==4741== 
start
delete n (n = 10)
==4741== Invalid read of size 8
==4741==    at 0x400C82: T::destroy() (p.cpp:18)
==4741==    by 0x400B4D: main (p.cpp:32)
==4741==  Address 0x5a83c80 is 0 bytes inside a block of size 16 free'd
==4741==    at 0x4C2B82B: operator delete(void*) (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==4741==    by 0x400C73: T::destroy() (p.cpp:17)
==4741==    by 0x400B4D: main (p.cpp:32)
==4741==  Block was alloc'd at
==4741==    at 0x4C2A6CF: operator new(unsigned long) (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==4741==    by 0x400B13: main (p.cpp:30)
==4741== 
==4741== Invalid read of size 4
==4741==    at 0x400C85: T::destroy() (p.cpp:18)
==4741==    by 0x400B4D: main (p.cpp:32)
==4741==  Address 0x5a83cd0 is 0 bytes inside a block of size 4 free'd
==4741==    at 0x4C2B82B: operator delete(void*) (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==4741==    by 0x400D93: T::~T() (p.cpp:11)
==4741==    by 0x400C6A: T::destroy() (p.cpp:17)
==4741==    by 0x400B4D: main (p.cpp:32)
==4741==  Block was alloc'd at
==4741==    at 0x4C2A6CF: operator new(unsigned long) (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==4741==    by 0x400C05: T::assgin(int) (in /home/angel/a.out)
==4741==    by 0x400B44: main (p.cpp:31)
==4741== 
10
end
==4741== Invalid read of size 4
==4741==    at 0x400CE8: operator<<(std::ostream&, T const&) (p.cpp:23)
==4741==    by 0x400B60: main (p.cpp:33)
==4741==  Address 0x5a83c88 is 8 bytes inside a block of size 16 free'd
==4741==    at 0x4C2B82B: operator delete(void*) (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==4741==    by 0x400C73: T::destroy() (p.cpp:17)
==4741==    by 0x400B4D: main (p.cpp:32)
==4741==  Block was alloc'd at
==4741==    at 0x4C2A6CF: operator new(unsigned long) (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==4741==    by 0x400B13: main (p.cpp:30)
==4741== 
1
done
==4741== 
==4741== HEAP SUMMARY:
==4741==     in use at exit: 72,704 bytes in 1 blocks
==4741==   total heap usage: 3 allocs, 2 frees, 72,724 bytes allocated
==4741== 
==4741== 72,704 bytes in 1 blocks are still reachable in loss record 1 of 1
==4741==    at 0x4C2A16F: malloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==4741==    by 0x4EC72FF: ??? (in /usr/lib64/libstdc++.so.6.0.21)
==4741==    by 0x400F159: call_init.part.0 (in /lib64/ld-2.22.so)
==4741==    by 0x400F26A: _dl_init (in /lib64/ld-2.22.so)
==4741==    by 0x4000CA9: ??? (in /lib64/ld-2.22.so)
==4741== 
==4741== LEAK SUMMARY:
==4741==    definitely lost: 0 bytes in 0 blocks
==4741==    indirectly lost: 0 bytes in 0 blocks
==4741==      possibly lost: 0 bytes in 0 blocks
==4741==    still reachable: 72,704 bytes in 1 blocks
==4741==         suppressed: 0 bytes in 0 blocks
==4741== 
==4741== For counts of detected and suppressed errors, rerun with: -v
==4741== ERROR SUMMARY: 3 errors from 3 contexts (suppressed: 0 from 0)

可以看到两个非法读,分别是18行和23行,分别发生在n被删除后和this被删除后。
好像发现更大的问题了!但是就此打住,顺便说一句“Rust大法好!”。。。
在删除对象后应该将指针指向nullptr
最后,FileState这个类实现的读写其实是在操纵放在内存中的vector<char*>

后面文件的顺序读、随机读以及写操作类都是上面FileState的包装,实际上都是在内存上操作。

InMemoryEnv
这个类继承于EvnWrapper,这就是EnvWrapper的作用了!由于这个类完全是在操作内存,所以其中的许多方法其实是哑操作。。。不过作者实现这个类的思路可以借鉴!关于具体实现,其实很简单,所以略过。

总结一下

通过看这几个文件,发现作者总是将复制构造函数和operator=作为私有成员,然而从C++11起有了= delete这个东西。

  • 进一步熟悉了多态,以及namespace {}static的使用。
  • 我们还可以使用编译器提供的__attribute__来加强代码的检查。
  • 可以简单的使用标准库的容器来完成多个线程的管理(线程池)。
  • 了解了将析构函数作为私有成员有哪些不同(比如这样一种情况,你希望在析构之前必须做一些事情,但是用你类的人并不知道,
    那么你就可以重新写一个函数,里面把要做的事情全部做完了再调用析构函数。
    这样人家只能调用你这个函数析构对象,从而保证了析构前一定会做你要求的动作)。
  • 我们还可以把文件操作抽象一下放在内存里。


转载请注明:Serenity » leveldb - env