AxiosTransformStream.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. 'use strict';
  2. import stream from 'stream';
  3. import utils from '../utils.js';
  4. import throttle from './throttle.js';
  5. import speedometer from './speedometer.js';
  6. const kInternals = Symbol('internals');
  7. class AxiosTransformStream extends stream.Transform{
  8. constructor(options) {
  9. options = utils.toFlatObject(options, {
  10. maxRate: 0,
  11. chunkSize: 64 * 1024,
  12. minChunkSize: 100,
  13. timeWindow: 500,
  14. ticksRate: 2,
  15. samplesCount: 15
  16. }, null, (prop, source) => {
  17. return !utils.isUndefined(source[prop]);
  18. });
  19. super({
  20. readableHighWaterMark: options.chunkSize
  21. });
  22. const self = this;
  23. const internals = this[kInternals] = {
  24. length: options.length,
  25. timeWindow: options.timeWindow,
  26. ticksRate: options.ticksRate,
  27. chunkSize: options.chunkSize,
  28. maxRate: options.maxRate,
  29. minChunkSize: options.minChunkSize,
  30. bytesSeen: 0,
  31. isCaptured: false,
  32. notifiedBytesLoaded: 0,
  33. ts: Date.now(),
  34. bytes: 0,
  35. onReadCallback: null
  36. };
  37. const _speedometer = speedometer(internals.ticksRate * options.samplesCount, internals.timeWindow);
  38. this.on('newListener', event => {
  39. if (event === 'progress') {
  40. if (!internals.isCaptured) {
  41. internals.isCaptured = true;
  42. }
  43. }
  44. });
  45. let bytesNotified = 0;
  46. internals.updateProgress = throttle(function throttledHandler() {
  47. const totalBytes = internals.length;
  48. const bytesTransferred = internals.bytesSeen;
  49. const progressBytes = bytesTransferred - bytesNotified;
  50. if (!progressBytes || self.destroyed) return;
  51. const rate = _speedometer(progressBytes);
  52. bytesNotified = bytesTransferred;
  53. process.nextTick(() => {
  54. self.emit('progress', {
  55. loaded: bytesTransferred,
  56. total: totalBytes,
  57. progress: totalBytes ? (bytesTransferred / totalBytes) : undefined,
  58. bytes: progressBytes,
  59. rate: rate ? rate : undefined,
  60. estimated: rate && totalBytes && bytesTransferred <= totalBytes ?
  61. (totalBytes - bytesTransferred) / rate : undefined,
  62. lengthComputable: totalBytes != null
  63. });
  64. });
  65. }, internals.ticksRate);
  66. const onFinish = () => {
  67. internals.updateProgress.call(true);
  68. };
  69. this.once('end', onFinish);
  70. this.once('error', onFinish);
  71. }
  72. _read(size) {
  73. const internals = this[kInternals];
  74. if (internals.onReadCallback) {
  75. internals.onReadCallback();
  76. }
  77. return super._read(size);
  78. }
  79. _transform(chunk, encoding, callback) {
  80. const self = this;
  81. const internals = this[kInternals];
  82. const maxRate = internals.maxRate;
  83. const readableHighWaterMark = this.readableHighWaterMark;
  84. const timeWindow = internals.timeWindow;
  85. const divider = 1000 / timeWindow;
  86. const bytesThreshold = (maxRate / divider);
  87. const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0;
  88. function pushChunk(_chunk, _callback) {
  89. const bytes = Buffer.byteLength(_chunk);
  90. internals.bytesSeen += bytes;
  91. internals.bytes += bytes;
  92. if (internals.isCaptured) {
  93. internals.updateProgress();
  94. }
  95. if (self.push(_chunk)) {
  96. process.nextTick(_callback);
  97. } else {
  98. internals.onReadCallback = () => {
  99. internals.onReadCallback = null;
  100. process.nextTick(_callback);
  101. };
  102. }
  103. }
  104. const transformChunk = (_chunk, _callback) => {
  105. const chunkSize = Buffer.byteLength(_chunk);
  106. let chunkRemainder = null;
  107. let maxChunkSize = readableHighWaterMark;
  108. let bytesLeft;
  109. let passed = 0;
  110. if (maxRate) {
  111. const now = Date.now();
  112. if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) {
  113. internals.ts = now;
  114. bytesLeft = bytesThreshold - internals.bytes;
  115. internals.bytes = bytesLeft < 0 ? -bytesLeft : 0;
  116. passed = 0;
  117. }
  118. bytesLeft = bytesThreshold - internals.bytes;
  119. }
  120. if (maxRate) {
  121. if (bytesLeft <= 0) {
  122. // next time window
  123. return setTimeout(() => {
  124. _callback(null, _chunk);
  125. }, timeWindow - passed);
  126. }
  127. if (bytesLeft < maxChunkSize) {
  128. maxChunkSize = bytesLeft;
  129. }
  130. }
  131. if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) {
  132. chunkRemainder = _chunk.subarray(maxChunkSize);
  133. _chunk = _chunk.subarray(0, maxChunkSize);
  134. }
  135. pushChunk(_chunk, chunkRemainder ? () => {
  136. process.nextTick(_callback, null, chunkRemainder);
  137. } : _callback);
  138. };
  139. transformChunk(chunk, function transformNextChunk(err, _chunk) {
  140. if (err) {
  141. return callback(err);
  142. }
  143. if (_chunk) {
  144. transformChunk(_chunk, transformNextChunk);
  145. } else {
  146. callback(null);
  147. }
  148. });
  149. }
  150. setLength(length) {
  151. this[kInternals].length = +length;
  152. return this;
  153. }
  154. }
  155. export default AxiosTransformStream;