本文共 7692 字,大约阅读时间需要 25 分钟。
这一篇或者说一个系列用来记录Redis相关的一些源码分析,不定时更新。
目前已添加的内容:
Redis的eventloop实现也是比较平常的,主要关注文件描述符和timer相关事件,而且timer只是简单用一个单链表(O(n)遍历寻找最近触发的时间)实现。
// 注册系统timer事件if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1);}// 注册poll fd的接收客户端连接的读事件for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); }}// 同上if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);acceptCommonHandler(cfd,0,cip);
// receieved a client, alloc client structure // and register it into eventpollclient *createClient(int fd) {client *c = zmalloc(sizeof(client));if (fd != -1) { anetNonBlock(NULL,fd); anetEnableTcpNoDelay(NULL,fd); if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); // register read event for client connection // the callback handler is readQueryFromClient // read into client data buffer if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; }}
dQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) --> processInputBuffer // handle query buffer// in processInputBuffer(c);if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break;} else if (c->reqtype == PROTO_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != C_OK) break;} else { serverPanic("Unknown request type");}/* Multibulk processing could see a <= 0 length. */if (c->argc == 0) { resetClient(c);} else { /* Only reset the client when the command was executed. */ // handle the client command if (processCommand(c) == C_OK) resetClient(c); /* freeMemoryIfNeeded may flush slave output buffers. This may result * into a slave, that may be the active client, to be freed. */ if (server.current_client == NULL) break;}
// in processCommand /* Exec the command */if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand){ queueMultiCommand(c); addReply(c,shared.queued);} else { // call the cmd // 进入具体数据结构的命令处理 call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnLists();}
Redis的hashtable实现叫dict,其实现和平常没有太大的区别,唯一比较特殊的地方是每个dict结构内部有两个实际的hashtable结构dictht,是为了实现增量哈希,故名思义,即当第一个dictht到一定负载因子后会触发rehash,分配新的dictht结构的动作和真正的rehash的动作是分离的,并且rehash被均摊到各个具体的操作中去了,这样就不会长时间阻塞线程,因为Redis是单线程。另外,增量hash可以按多步或者持续一定时间做。
// dict.h// hash table entry typedef struct dictEntry { void *key; // key union { void *val; uint64_t u64; int64_t s64; double d; } v; // value struct dictEntry *next; // linked list } dictEntry;// operations(APIS) of some type of hashtabletypedef struct dictType { // hash function unsigned int (*hashFunction)(const void *key); // copy key void *(*keyDup)(void *privdata, const void *key); // copy value void *(*valDup)(void *privdata, const void *obj); // key comparison int (*keyCompare)(void *privdata, const void *key1, const void *key2); // dtor for key void (*keyDestructor)(void *privdata, void *key); // dtor for value void (*valDestructor)(void *privdata, void *obj);} dictType;/* This is our hash table structure. Every dictionary has two of this as we * implement incremental rehashing, for the old to the new table. */// a hashtable typedef struct dictht { dictEntry **table; // entries unsigned long size; // max size unsigned long sizemask; // mask unsigned long used; // current used } dictht;typedef struct dict { dictType *type; // type operations void *privdata; // for extension dictht ht[2]; // two hashtables // rehashing flag long rehashidx; /* rehashing not in progress if rehashidx == -1 */ // users number unsigned long iterators; /* number of iterators currently running */} dict;/* If safe is set to 1 this is a safe iterator, that means, you can call * dictAdd, dictFind, and other functions against the dictionary even while * iterating. Otherwise it is a non safe iterator, and only dictNext() * should be called while iterating. */typedef struct dictIterator { dict *d; long index; int table, safe; dictEntry *entry, *nextEntry; /* unsafe iterator fingerprint for misuse detection. */ long long fingerprint;} dictIterator;
// dict.h// createdict *dictCreate(dictType *type, void *privDataPtr);// expand or initilize the just created dict, alloc second hashtable of dict for incremental rehashingint dictExpand(dict *d, unsigned long size);// add, if in rehashing, do 1 step of incremental rehashingint dictAdd(dict *d, void *key, void *val);dictEntry *dictAddRaw(dict *d, void *key);// update, if in rehashing, do 1 step of incremental rehashing// can we first find and return the entry no matter it is update or add, so // we can speed up the update process because no need to do twice find process?int dictReplace(dict *d, void *key, void *val);dictEntry *dictReplaceRaw(dict *d, void *key);// delete if in rehashing, do 1 step of incremental rehashingint dictDelete(dict *d, const void *key); // free the memory int dictDeleteNoFree(dict *d, const void *key); // not free the memory// can we use a double linked list to free the hash table, so speed up?void dictRelease(dict *d);// find an entrydictEntry * dictFind(dict *d, const void *key);void *dictFetchValue(dict *d, const void *key);// resize to eh pow of 2 number just >= the used number of slotsint dictResize(dict *d);// alloc a new iteratordictIterator *dictGetIterator(dict *d);// alloc a safe iterator dictIterator *dictGetSafeIterator(dict *d);// next entry dictEntry *dictNext(dictIterator *iter);void dictReleaseIterator(dictIterator *iter);// random samplingdictEntry *dictGetRandomKey(dict *d);unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count);// get stats infovoid dictGetStats(char *buf, size_t bufsize, dict *d);// murmurhash unsigned int dictGenHashFunction(const void *key, int len);unsigned int dictGenCaseHashFunction(const unsigned char *buf, int len);// empty a dict void dictEmpty(dict *d, void(callback)(void*));void dictEnableResize(void);void dictDisableResize(void);// do n steps rehashingint dictRehash(dict *d, int n);// do rehashing for a ms millisecondsint dictRehashMilliseconds(dict *d, int ms);// hash function seed void dictSetHashFunctionSeed(unsigned int initval);unsigned int dictGetHashFunctionSeed(void);// scan a dictunsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata);
在dictReplace中能否统一add和update的查找,无论是add还是update都返回一个entry,用标识表明是add还是update,而不用在update时做两次查找,从而提升update的性能
在release整个dict时,是循环遍历所有头bucket,最坏情况接近O(n),能否用双向的空闲链表优化(当然这样会浪费指针所占空间)