import {
  CircularDataFrame,
  DataQueryRequest,
  DataQueryResponse,
  DataSourceInstanceSettings,
  FieldType,
  LoadingState,
} from '@grafana/data';
import { DataSourceWithBackend } from '@grafana/runtime';
import { BehaviorSubject, combineLatest, map, merge, Observable } from 'rxjs';

import { DEFAULT_QUERY, MyDataSourceOptions, MyQuery } from './types';
import { components } from 'api';
import { defaults } from 'lodash';
import ReconnectingWebSocket from 'reconnecting-websocket';
import { FluxTableMetaData, InfluxDB } from '@influxdata/influxdb-client';
import { WS_API_URL, WT_API_URL } from 'common';

type Channel = components['schemas']['Channel'];

const url =
  window.location.protocol === 'https:' ? 'https://tcom-grafana.arkhvoid.com/influxdb' : 'http://127.0.0.1:8086';
const token = '4R8an4pC6D5zZzt7LCOhvD5R-mR4pu55AmKYE_P_SvqRxzELAJVr4JytXcn5ADn3x4CHDN3xTTgWgcDWsDUclg==';
const org = 'tcom';

const queryApi = new InfluxDB({ url, token }).getQueryApi(org);

export const channelListObservable = new BehaviorSubject<Channel[]>([]);
export const dataflowsObservable = new BehaviorSubject<Array<components['schemas']['Dataflow']>>([]);

export const timeSeriesChannelListObservable = combineLatest([dataflowsObservable, channelListObservable]).pipe(
  map(([dataflows, channels]) => {
    return channels.filter(
      (channel) => dataflows.find((d) => d.id === channel.dataflowId)?.expand?.sink?.outputType === 'TIMESERIES'
    );
  })
);
export const frequencyChannelListObservable = combineLatest([dataflowsObservable, channelListObservable]).pipe(
  map(([dataflows, channels]) => {
    return channels.filter(
      (channel) => dataflows.find((d) => d.id === channel.dataflowId)?.expand?.sink?.outputType === 'FREQUENCY'
    );
  })
);
const ws = new ReconnectingWebSocket(`${WS_API_URL}/api/channels/watch`);

ws.addEventListener('message', (event) => {
  const data = JSON.parse(event.data);
  channelListObservable.next(data);
});

const ws2 = new ReconnectingWebSocket(`${WS_API_URL}/api/dataflows/watch`);

ws2.addEventListener('message', (event) => {
  const data = JSON.parse(event.data);
  dataflowsObservable.next(data);
});

// @ts-ignore
async function initTransport(url: string): Promise<WebTransport> {
  // Initialize transport connection
  // @ts-ignore
  const transport = new WebTransport(url);

  // The connection can be used once ready fulfills
  await transport.ready;
  return transport;
}

async function startTransport(url: string) {
  const transport = await initTransport(url);

  const ws = await transport.createBidirectionalStream();
  const writer = ws.writable.getWriter() as WritableStreamDefaultWriter<Uint8Array>;
  const reader = ws.readable.getReader() as ReadableStreamDefaultReader<Uint8Array>;

  return [transport, { writer, reader }] as const;
}

// @ts-ignore
async function closeTransport(transport: WebTransport) {
  // Respond to connection closing
  try {
    await transport.closed;
    console.log(`The HTTP/3 connection to ${url} closed gracefully.`);
  } catch (error) {
    console.error(`The HTTP/3 connection to ${url} closed due to ${error}.`);
  }
}

export class DataSource extends DataSourceWithBackend<MyQuery, MyDataSourceOptions> {
  constructor(instanceSettings: DataSourceInstanceSettings<MyDataSourceOptions>) {
    super(instanceSettings);
  }

  query(options: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
    const seconds = options.range.to.diff(options.range.from, 'seconds');

    const observables = options.targets.map((target) => {
      const query = defaults(target, DEFAULT_QUERY);

      return new Observable<DataQueryResponse>((subscriber) => {
        // let ws: ReconnectingWebSocket | undefined;
        // @ts-ignore
        let wt: WebTransport | undefined;
        let t: any | undefined;

        const run = async () => {
          if (channelListObservable.value.length === 0) {
            setTimeout(run, 50);
            return;
          }

          const channelsFrameMap: Record<string, CircularDataFrame> = {};
          const channelsFrameMaxDataPointsPerSecondMap: Record<string, number> = {};
          let isFirstMessage: Record<string, boolean> = {};
          const isStreaming = options.range.raw.to === 'now';

          const selectedChannels = channelListObservable.value.filter((channel) =>
            target.selectedChannels.includes(channel.name)
          );

          const fetchInitialData = (frame: CircularDataFrame, channel: string, from: string, to: string) =>
            new Promise<void>(async (resolve, reject) => {
              queryApi.queryRows(
                `
                  from(bucket: "recent_one_hour_data")
                    |> range(start: ${from}, stop: ${to})
                    |> filter(fn: (r) => r["_measurement"] == "${channel}")
                    |> keep(columns: ["_time", "_value"])
                  `.trim(),
                {
                  next(row: string[], tableMeta: FluxTableMetaData) {
                    const o = tableMeta.toObject(row);
                    frame.appendRow([o._time, o._value]);
                  },
                  error(error: Error) {
                    console.error(error);
                    reject(error);
                  },
                  complete() {
                    resolve();
                  },
                }
              );
            });

          selectedChannels.forEach((channel) => {
            const maxDataPointsPerSecond = (channel.plottingAttributes?.maxDataPointsPerSecond as number) ?? 24;

            const frame = new CircularDataFrame({
              append: 'head',
              capacity: maxDataPointsPerSecond * seconds,
            });

            frame.refId = query.refId;
            frame.addField({ name: 'time', type: FieldType.time });
            frame.addField({
              name: channel.name,
              type: FieldType.number,
              config: {
                color: channel.plottingAttributes?.color
                  ? { mode: 'fixed', fixedColor: channel.plottingAttributes.color as string }
                  : {
                      mode: 'palette-classic-by-name',
                    },
              },
            });

            channelsFrameMap[channel.name] = frame;
            channelsFrameMaxDataPointsPerSecondMap[channel.name] = maxDataPointsPerSecond;
            isFirstMessage[channel.name] = false;

            if (!isStreaming) {
              fetchInitialData(frame, channel.name, options.range.from.toISOString(), options.range.to.toISOString())
                .then(() => {
                  subscriber.next({
                    data: Object.values(channelsFrameMap),
                    key: query.refId,
                    state: LoadingState.Done,
                  });
                })
                .catch((err) => {
                  subscriber.error(err);
                });
            }
          });

          let hiddenChannelFrameMap: Record<string, CircularDataFrame> = {};

          channelListObservable.subscribe((channels) => {
            for (const channel of channels) {
              let frame = channelsFrameMap[channel.name] || hiddenChannelFrameMap[channel.name];
              if (!frame) {
                continue;
              }
              if (!channel.plottingAttributes) {
                channel.plottingAttributes = {};
              }
              if (
                'maxDataPointsPerSecond' in channel.plottingAttributes &&
                channel.plottingAttributes.maxDataPointsPerSecond !==
                  channelsFrameMaxDataPointsPerSecondMap[channel.name]
              ) {
                const maxDataPointsPerSecond = channel.plottingAttributes.maxDataPointsPerSecond as number;
                frame = new CircularDataFrame({
                  append: 'head',
                  capacity: maxDataPointsPerSecond * seconds,
                });

                frame.refId = query.refId;
                frame.addField({ name: 'time', type: FieldType.time });
                frame.addField({
                  name: channel.name,
                  type: FieldType.number,
                  config: {
                    color: channel.plottingAttributes
                      ? { mode: 'fixed', fixedColor: channel.plottingAttributes.color as string }
                      : {
                          mode: 'palette-classic-by-name',
                        },
                  },
                });

                if (channelsFrameMap[channel.name]) {
                  channelsFrameMap[channel.name] = frame;
                } else {
                  hiddenChannelFrameMap[channel.name] = frame;
                }

                channelsFrameMaxDataPointsPerSecondMap[channel.name] = maxDataPointsPerSecond;
              }
              if ('color' in channel.plottingAttributes) {
                frame.fields[1].config = {
                  color: {
                    mode: 'fixed',
                    fixedColor: channel.plottingAttributes.color as string,
                  },
                };
              }
              if ('hidden' in channel.plottingAttributes) {
                if (channel.plottingAttributes.hidden) {
                  hiddenChannelFrameMap[channel.name] = frame;
                  delete channelsFrameMap[channel.name];
                } else {
                  channelsFrameMap[channel.name] = frame;
                  delete hiddenChannelFrameMap[channel.name];
                }
              }
            }
          });

          if (isStreaming) {
            const [transport, { writer, reader }] = await startTransport(`${WT_API_URL}/subscribe`);
            wt = transport;

            const textEncoder = new TextEncoder();

            await writer.write(
              textEncoder.encode(
                JSON.stringify({
                  channels: selectedChannels.map((channel) => channel.name),
                }) + '\n'
              )
            );

            const msgChecker = setInterval(() => {
              if (!Object.values(isFirstMessage).some((v) => !v)) {
                clearTimeout(msgChecker);
                return;
              }
              Object.keys(isFirstMessage).forEach(async (channel) => {
                if (!isFirstMessage[channel]) {
                  const frame = channelsFrameMap[channel] || hiddenChannelFrameMap[channel];
                  const data = frame.fields[0].values[0];

                  if (data) {
                    isFirstMessage[channel] = true;

                    try {
                      await fetchInitialData(frame, channel, `-${seconds}s`, 'now()');
                    } catch (err) {
                      subscriber.error(err);
                    }
                  }
                }
              });
            }, 10);

            t = setInterval(() => {
              subscriber.next({
                data: Object.values(channelsFrameMap),
                key: query.refId,
                state: LoadingState.Streaming,
              });
            }, 1000 / query.refreshRate || 1);

            const textDecoder = new TextDecoder();

            let buffer = '';

            while (true) {
              const { value, done } = await reader.read();
              if (done) {
                break;
              }
              buffer += textDecoder.decode(value);
              const messages = buffer.split('\n');
              for (let i = 0; i < messages.length; i++) {
                if (i === messages.length - 1) {
                  if (messages[i] !== '') {
                    buffer = messages[i];
                  } else {
                    buffer = '';
                  }
                  break;
                }
                const data = JSON.parse(messages[i]);
                data.values.forEach((d: { c: string; v: number }) => {
                  const frame = channelsFrameMap[d.c] || hiddenChannelFrameMap[d.c];
                  frame.appendRow([data.time, d.v]);
                });
              }
            }
          }
        };

        run().catch(console.error);

        return () => {
          wt?.close();
          clearInterval(t);
        };
      });
    });

    return merge(...observables);
  }

  async testDatasource() {
    // Implement a health check for your data source.
    return {
      status: 'success',
      message: 'Success',
    };
  }
}
