'use strict';

const { Readable, Duplex, PassThrough } = require('node:stream');
const assert = require('node:assert');
const { AsyncResource } = require('node:async_hooks');
const {
  InvalidArgumentError,
  InvalidReturnValueError,
  RequestAbortedError,
} = require('../core/errors');
const util = require('../core/util');
const { addSignal, removeSignal } = require('./abort-signal');

function noop() {}

const kResume = Symbol('resume');

class PipelineRequest extends Readable {
  constructor() {
    super({ autoDestroy: true });

    this[kResume] = null;
  }

  _read() {
    const { [kResume]: resume } = this;

    if (resume) {
      this[kResume] = null;
      resume();
    }
  }

  _destroy(err, callback) {
    this._read();

    callback(err);
  }
}

class PipelineResponse extends Readable {
  constructor(resume) {
    super({ autoDestroy: true });
    this[kResume] = resume;
  }

  _read() {
    this[kResume]();
  }

  _destroy(err, callback) {
    if (!err && !this._readableState.endEmitted) {
      err = new RequestAbortedError();
    }

    callback(err);
  }
}

class PipelineHandler extends AsyncResource {
  constructor(opts, handler) {
    if (!opts || typeof opts !== 'object') {
      throw new InvalidArgumentError('invalid opts');
    }

    if (typeof handler !== 'function') {
      throw new InvalidArgumentError('invalid handler');
    }

    const { signal, method, opaque, onInfo, responseHeaders } = opts;

    if (
      signal &&
      typeof signal.on !== 'function' &&
      typeof signal.addEventListener !== 'function'
    ) {
      throw new InvalidArgumentError(
        'signal must be an EventEmitter or EventTarget'
      );
    }

    if (method === 'CONNECT') {
      throw new InvalidArgumentError('invalid method');
    }

    if (onInfo && typeof onInfo !== 'function') {
      throw new InvalidArgumentError('invalid onInfo callback');
    }

    super('UNDICI_PIPELINE');

    this.opaque = opaque || null;
    this.responseHeaders = responseHeaders || null;
    this.handler = handler;
    this.abort = null;
    this.context = null;
    this.onInfo = onInfo || null;

    this.req = new PipelineRequest().on('error', noop);

    this.ret = new Duplex({
      readableObjectMode: opts.objectMode,
      autoDestroy: true,
      read: () => {
        const { body } = this;

        if (body?.resume) {
          body.resume();
        }
      },
      write: (chunk, encoding, callback) => {
        const { req } = this;

        if (req.push(chunk, encoding) || req._readableState.destroyed) {
          callback();
        } else {
          req[kResume] = callback;
        }
      },
      destroy: (err, callback) => {
        const { body, req, res, ret, abort } = this;

        if (!err && !ret._readableState.endEmitted) {
          err = new RequestAbortedError();
        }

        if (abort && err) {
          abort();
        }

        util.destroy(body, err);
        util.destroy(req, err);
        util.destroy(res, err);

        removeSignal(this);

        callback(err);
      },
    }).on('prefinish', () => {
      const { req } = this;

      // Node < 15 does not call _final in same tick.
      req.push(null);
    });

    this.res = null;

    addSignal(this, signal);
  }

  onConnect(abort, context) {
    const { res } = this;

    if (this.reason) {
      abort(this.reason);
      return;
    }

    assert(!res, 'pipeline cannot be retried');

    this.abort = abort;
    this.context = context;
  }

  onHeaders(statusCode, rawHeaders, resume) {
    const { opaque, handler, context } = this;

    if (statusCode < 200) {
      if (this.onInfo) {
        const headers =
          this.responseHeaders === 'raw' ?
            util.parseRawHeaders(rawHeaders)
          : util.parseHeaders(rawHeaders);
        this.onInfo({ statusCode, headers });
      }
      return;
    }

    this.res = new PipelineResponse(resume);

    let body;
    try {
      this.handler = null;
      const headers =
        this.responseHeaders === 'raw' ?
          util.parseRawHeaders(rawHeaders)
        : util.parseHeaders(rawHeaders);
      body = this.runInAsyncScope(handler, null, {
        statusCode,
        headers,
        opaque,
        body: this.res,
        context,
      });
    } catch (err) {
      this.res.on('error', noop);
      throw err;
    }

    if (!body || typeof body.on !== 'function') {
      throw new InvalidReturnValueError('expected Readable');
    }

    body
      .on('data', (chunk) => {
        const { ret, body } = this;

        if (!ret.push(chunk) && body.pause) {
          body.pause();
        }
      })
      .on('error', (err) => {
        const { ret } = this;

        util.destroy(ret, err);
      })
      .on('end', () => {
        const { ret } = this;

        ret.push(null);
      })
      .on('close', () => {
        const { ret } = this;

        if (!ret._readableState.ended) {
          util.destroy(ret, new RequestAbortedError());
        }
      });

    this.body = body;
  }

  onData(chunk) {
    const { res } = this;
    return res.push(chunk);
  }

  onComplete(trailers) {
    const { res } = this;
    res.push(null);
  }

  onError(err) {
    const { ret } = this;
    this.handler = null;
    util.destroy(ret, err);
  }
}

function pipeline(opts, handler) {
  try {
    const pipelineHandler = new PipelineHandler(opts, handler);
    this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler);
    return pipelineHandler.ret;
  } catch (err) {
    return new PassThrough().destroy(err);
  }
}

module.exports = pipeline;