events模塊是Node.js中比較簡單但是卻非常核心的模塊,Node.js中,很多模塊都繼承於events模塊,events模塊是發布、訂閱模式的實現。我們首先看一下如何使用events模塊。
1. const { EventEmitter } = require('events'); 2. class Events extends EventEmitter {} 3. const events = new Events(); 4. events.on('demo', () => { 5. console.log('emit demo event'); 6. }); 7. events.emit('demo');接下來我們看一下events模塊的具體實現。
1 初始化 當new一個EventEmitter或者它的子類時,就會進入EventEmitter的邏輯。
1. function EventEmitter(opts) { 2. EventEmitter.init.call(this, opts); 3. } 4. 5. EventEmitter.init = function(opts) { 6. // 如果是未初始化或者沒有自定義_events,則初始化 7. if (this._events === undefined || 8. this._events === ObjectGetPrototypeOf(this)._events) { 9. this._events = ObjectCreate(null); 10. this._eventsCount = 0; 11. } 12. /*13. 初始化一類事件的處理函數個數的閾值14. 我們可以通過setMaxListeners接口設置,15. 如果沒有顯示設置,閾值則為defaultMaxListeners的值(10),16. 可通過getMaxListeners接口獲取17. */ 18. this._maxListeners = this._maxListeners || undefined; 19. 20. // 是否開啟捕獲promise reject,默認false 21. if (opts && opts.captureRejections) { 22. this[kCapture] = Boolean(opts.captureRejections); 23. } else { 24. this[kCapture] = EventEmitter.prototype[kCapture]; 25. } 26. };EventEmitter的初始化主要是初始化了一些數據結構和屬性。唯一支持的一個參數就是captureRejections,captureRejections表示當觸發事件,執行處理函數時,EventEmitter是否捕獲處理函數中的異常。後面我們會詳細講解。
2 訂閱事件 初始化完EventEmitter之後,我們就可以開始使用訂閱、發布的功能。我們可以通過addListener、prependListener、on、once訂閱事件。addListener和on是等價的,prependListener的區別在於處理函數會被插入到隊首,而默認是追加到隊尾。once註冊的處理函數,最多被執行一次。四個api都是通過_addListener函數實現的。下面我們看一下具體實現。
1. function _addListener(target, type, listener, prepend) { 2. let m; 3. let events; 4. let existing; 5. events = target._events; 6. // 還沒有初始化_events則初始化,_eventsCount為事件類型個數 7. if (events === undefined) { 8. events = target._events = ObjectCreate(null); 9. target._eventsCount = 0; 10. } else { 11. /* 12. 已經註冊過事件,則判斷是否定義了newListener事件,13. 是的話先觸發,如果監聽了newListener事件,每次註冊14. 其它事件時都會觸發newListener,相當於鉤子 15. */ 16. if (events.newListener !== undefined) { 17. target.emit('newListener', 18. type, 19. listener.listener ? 20. listener.listener : 21. listener); 22. // newListener處理函數可能會修改_events,這裡重新賦值 23. events = target._events; 24. } 25. // 判斷是否已經存在處理函數 26. existing = events[type]; 27. } 28. // 不存在則以函數的形式存儲,否則以數組形式存儲 29. if (existing === undefined) { 30. events[type] = listener; 31. // 新增一個事件類型,事件類型個數加一32. ++target._eventsCount; 33. } else { 34. /* 35. existing是函數說明之前註冊過該事件一次,36. 否則說明existing為數組,則直接插入相應位置37. */38. if (typeof existing === 'function') { 39. existing = events[type] = 40. prepend ? [listener, existing] : [existing, listener]; 41. } else if (prepend) { 42. existing.unshift(listener); 43. } else { 44. existing.push(listener); 45. } 46. 47. // 處理告警,處理函數過多可能是因為之前的沒有刪除,造成內存洩漏 48. m = _getMaxListeners(target); 49. // 該事件處理函數達到閾值並且還沒有提示過警告信息則提示50. if (m > 0 && existing.length > m && !existing.warned) { 51. existing.warned = true; 52. const w = new Error('錯誤信息…'); 53. w.name = 'MaxListenersExceededWarning'; 54. w.emitter = target; 55. w.type = type; 56. w.count = existing.length; 57. process.emitWarning(w); 58. } 59. } 60. 61. return target; 62. }接下來我們看一下once的實現,對比其它幾種api,once的實現相對比較複雜,因為我們要控制處理函數最多執行一次,所以我們需要保證在事件觸發的時候,執行用戶定義函數的同時,還需要刪除註冊的事件。
1. EventEmitter.prototype.once = function once(type, listener) { 2. this.on(type, _onceWrap(this, type, listener)); 3. return this; 4. }; 5. 6. function onceWrapper() { 7. // 還沒有觸發過 8. if (!this.fired) { 9. // 刪除它 10. this.target.removeListener(this.type, this.wrapFn); 11. // 觸發了 12. this.fired = true; 13. // 執行 14. if (arguments.length === 0) 15. return this.listener.call(this.target); 16. return this.listener.apply(this.target, arguments); 17. } 18. } 19. // 支持once api 20. function _onceWrap(target, type, listener) { 21. // fired是否已執行處理函數,wrapFn包裹listener的函數 22. const state = { fired: false, wrapFn: undefined, target, type, listener }; 23. // 生成一個包裹listener的函數 24. const wrapped = onceWrapper.bind(state); 25. /*26. 把原函數listener也掛到包裹函數中,用於事件沒有觸發前,27. 用戶主動刪除,見removeListener 28. */29. wrapped.listener = listener; 30. // 保存包裹函數,用於執行完後刪除,見onceWrapper 31. state.wrapFn = wrapped; 32. return wrapped; 33. }Once函數構造一個上下文(state)保存用戶處理函數和執行狀態等信息,然後通過bind返回一個帶有該上下文(state)的函數wrapped註冊到事件系統。當事件觸發時,在wrapped函數中首先移除wrapped,然後執行用戶的函數。Wrapped起到了劫持的作用。另外還需要在wrapped上保存用戶傳進來的函數,當用戶在事件觸發前刪除該事件時或解除該函數時,在遍歷該類事件的處理函數過程中,可以通過wrapped.listener找到對應的項進行刪除。
3 觸發事件 分析完事件的訂閱,接著我們看一下事件的觸發。
1. EventEmitter.prototype.emit = function emit(type, ...args) { 2. // 觸發的事件是否是error,error事件需要特殊處理 3. let doError = (type === 'error'); 4. 5. const events = this._events; 6. // 定義了處理函數(不一定是type事件的處理函數) 7. if (events !== undefined) { 8. /*9. 如果觸發的事件是error,並且監聽了kErrorMonitor10. 事件則觸發kErrorMonitor事件11. */ 12. if (doError && events[kErrorMonitor] !== undefined) 13. this.emit(kErrorMonitor, ...args); 14. // 觸發的是error事件但是沒有定義處理函數 15. doError = (doError && events.error === undefined); 16. } else if (!doError) 17. // 沒有定義處理函數並且觸發的不是error事件則不需要處理, 18. return false; 19. 20. // If there is no 'error' event listener then throw. 21. // 觸發的是error事件,但是沒有定義處理error事件的函數,則報錯 22. if (doError) { 23. let er; 24. if (args.length > 0) 25. er = args[0]; 26. // 第一個入參是Error的實例 27. if (er instanceof Error) { 28. try { 29. const capture = {}; 30. /* 31. 給capture對象注入stack屬性,stack的值是執行 32. Error.captureStackTrace語句的當前棧信息,但是33. 不包括emit的部分 34. */ 35. Error.captureStackTrace(capture, EventEmitter.prototype.emit); 36. ObjectDefineProperty(er, kEnhanceStackBeforeInspector, { 37. value: enhanceStackTrace.bind(this, er, capture), 38. configurable: true 39. }); 40. } catch {} 41. throw er; // Unhandled 'error' event 42. } 43. 44. let stringifiedEr; 45. const { inspect } = require('internal/util/inspect'); 46. try { 47. stringifiedEr = inspect(er); 48. } catch { 49. stringifiedEr = er; 50. } 51. const err = new ERR_UNHANDLED_ERROR(stringifiedEr); 52. err.context = er; 53. throw err; // Unhandled 'error' event 54. } 55. // 獲取type事件對應的處理函數 56. const handler = events[type]; 57. // 沒有則不處理 58. if (handler === undefined) 59. return false; 60. // 等於函數說明只有一個 61. if (typeof handler === 'function') { 62. // 直接執行 63. const result = ReflectApply(handler, this, args); 64. // 非空判斷是不是promise並且是否需要處理,見addCatch 65. if (result !== undefined && result !== null) { 66. addCatch(this, result, type, args); 67. } 68. } else { 69. // 多個處理函數,同上 70. const len = handler.length; 71. const listeners = arrayClone(handler, len); 72. for (let i = 0; i < len; ++i) { 73. const result = ReflectApply(listeners[i], this, args); 74. if (result !== undefined && result !== null) { 75. addCatch(this, result, type, args); 76. } 77. } 78. } 79. 80. return true; 81. }我們看到在Node.js中,對於error事件是特殊處理的,如果用戶沒有註冊error事件的處理函數,可能會導致程序掛掉,另外我們看到有一個addCatch的邏輯,addCatch是為了支持事件處理函數為異步模式的情況,比如async函數或者返回Promise的函數。
1. function addCatch(that, promise, type, args) { 2. // 沒有開啟捕獲則不需要處理 3. if (!that[kCapture]) { 4. return; 5. } 6. // that throws on second use. 7. try { 8. const then = promise.then; 9. 10. if (typeof then === 'function') { 11. // 註冊reject的處理函數 12. then.call(promise, undefined, function(err) { 13. process.nextTick(emitUnhandledRejectionOrErr, that, err, type, args); 14. }); 15. } 16. } catch (err) { 17. that.emit('error', err); 18. } 19. } 20. 21. function emitUnhandledRejectionOrErr(ee, err, type, args) { 22. // 用戶實現了kRejection則執行 23. if (typeof ee[kRejection] === 'function') { 24. ee[kRejection](err, type, ...args); 25. } else { 26. // 保存當前值 27. const prev = ee[kCapture]; 28. try { 29. /* 30. 關閉然後觸發error事件,意義 31. 1 防止error事件處理函數也拋出error,導致死循環 32. 2 如果用戶處理了error,則進程不會退出,所以需要恢復33. kCapture的值如果用戶沒有處理error,則Node.js會觸發34. uncaughtException,如果用戶處理了uncaughtException35. 則需要恢復kCapture的值 36. */ 37. ee[kCapture] = false; 38. ee.emit('error', err); 39. } finally { 40. ee[kCapture] = prev; 41. } 42. } 43. }4 取消訂閱 我們接著看一下刪除事件處理函數的邏輯。
1. function removeAllListeners(type) { 2. const events = this._events; 3. if (events === undefined) 4. return this; 5. 6. /*7. 沒有註冊removeListener事件,則只需要刪除數據,8. 否則還需要觸發removeListener事件 9. */10. if (events.removeListener === undefined) { 11. // 等於0說明是刪除全部 12. if (arguments.length === 0) { 13. this._events = ObjectCreate(null); 14. this._eventsCount = 0; 15. } else if (events[type] !== undefined) { 16. /*17. 否則是刪除某個類型的事件,是唯一一個處理函數,18. 則重置_events,否則刪除對應的事件類型 19. */20. if (--this._eventsCount === 0) 21. this._events = ObjectCreate(null); 22. else 23. delete events[type]; 24. } 25. return this; 26. } 27. 28. /*29. 說明註冊了removeListener事件,arguments.length === 030. 說明刪除所有類型的事件 31. */32. if (arguments.length === 0) { 33. /* 34. 逐個刪除,除了removeListener事件,35. 這裡刪除了非removeListener事件36. */ 37. for (const key of ObjectKeys(events)) { 38. if (key === 'removeListener') continue; 39. this.removeAllListeners(key); 40. } 41. // 這裡刪除removeListener事件,見下面的邏輯 42. this.removeAllListeners('removeListener'); 43. // 重置數據結構 44. this._events = ObjectCreate(null); 45. this._eventsCount = 0; 46. return this; 47. } 48. // 刪除某類型事件 49. const listeners = events[type]; 50. 51. if (typeof listeners === 'function') { 52. this.removeListener(type, listeners); 53. } else if (listeners !== undefined) { 54. // LIFO order 55. for (let i = listeners.length - 1; i >= 0; i--) { 56. this.removeListener(type, listeners[i]); 57. } 58. } 59. 60. return this; 61. }removeAllListeners函數主要的邏輯有兩點,第一個是removeListener事件需要特殊處理,這類似一個鉤子,每次用戶刪除事件處理函數的時候都會觸發該事件。第二是removeListener函數。removeListener是真正刪除事件處理函數的實現。removeAllListeners是封裝了removeListener的邏輯。
1. function removeListener(type, listener) { 2. let originalListener; 3. const events = this._events; 4. // 沒有東西可刪除 5. if (events === undefined) 6. return this; 7. 8. const list = events[type]; 9. // 同上 10. if (list === undefined) 11. return this; 12. // list是函數說明只有一個處理函數,否則是數組,如果list.listener === listener說明是once註冊的 13. if (list === listener || list.listener === listener) { 14. // type類型的處理函數就一個,並且也沒有註冊其它類型的事件,則初始化_events 15. if (--this._eventsCount === 0) 16. this._events = ObjectCreate(null); 17. else { 18. // 就一個執行完刪除type對應的屬性 19. delete events[type]; 20. // 註冊了removeListener事件,則先註冊removeListener事件 21. if (events.removeListener) 22. this.emit('removeListener',23. type,24. list.listener || listener); 25. } 26. } else if (typeof list !== 'function') { 27. // 多個處理函數 28. let position = -1; 29. // 找出需要刪除的函數 30. for (let i = list.length - 1; i >= 0; i--) { 31. if (list[i] === listener || 32. list[i].listener === listener) { 33. // 保存原處理函數,如果有的話 34. originalListener = list[i].listener; 35. position = i; 36. break; 37. } 38. } 39. 40. if (position < 0) 41. return this; 42. // 第一個則出隊,否則刪除一個 43. if (position === 0) 44. list.shift(); 45. else { 46. if (spliceOne === undefined) 47. spliceOne = require('internal/util').spliceOne; 48. spliceOne(list, position); 49. } 50. // 如果只剩下一個,則值改成函數類型 51. if (list.length === 1) 52. events[type] = list[0]; 53. // 觸發removeListener 54. if (events.removeListener !== undefined) 55. this.emit('removeListener', 56. type,57. originalListener || listener); 58. } 59. 60. return this; 61. };以上就是events模塊的核心邏輯,另外還有一些工具函數就不一一分析。