import MetricsApi from "modules/api/MetricsApi";
import {
	fetchLastCommittedMetricsSucceeded,
	fetchWsrepLocalStateMetricsSuccess
} from "modules/metricsStore/actions";
import { isMonitorEnabledSelector } from "modules/metricsStore/selectors";
import { METRICS_ACTION } from "modules/metricsStore/storeTypes";
import {
	METRIC,
	WsrepLastCommittedMetric,
	WsrepLocalStateMetric
} from "modules/metricsStore/types";
import moment from "moment";
import { takeLatest } from "redux-saga/effects";
import { call, delay, put, select } from "typed-redux-saga";

const POLLING_INTERVAL = 3000;

function* monitorLastCommitted() {
	// console.log("Last committed metrics monitor started");

	let monitor: boolean = yield select(isMonitorEnabledSelector);

	const query = `SELECT LAST(wsrep_last_committed), cluster, node FROM mysql GROUP BY "node", "cluster"`;

	while (monitor) {
		try {
			const response = yield* call(MetricsApi.fetch, query);
			// console.log(
			// 	"metrics fetch response",
			// 	response.data.results[0].series
			// );

			const data = response.data.results[0].series;

			const metricsRecord: Record<string, WsrepLastCommittedMetric> = {};

			data?.forEach((res: any, index: number) => {
				const metric: WsrepLastCommittedMetric = {
					type: METRIC.WSREP_LAST_COMMITTED,
					time: new Date(res.values[0][data[index].columns.indexOf("time")]),
					value: res.values[0][data[index].columns.indexOf("last")],
					cluster: res.values[0][data[index].columns.indexOf("cluster")],
					node: res.values[0][data[index].columns.indexOf("node")]
				};

				metricsRecord[`${metric.cluster},${metric.node}`] = metric;

				if (
					metricsRecord[metric.cluster] &&
					metricsRecord[metric.cluster].value > metric.value
				) {
					// skip if this metric is lower than one already in metric record store
				} else {
					metricsRecord[metric.cluster] = metric;
				}
			});

			// console.log("last committed metrics", metrics);

			yield put(fetchLastCommittedMetricsSucceeded(metricsRecord));
		} catch (e) {
			console.error("Last committed monitor error:", e);
		} finally {
			yield delay(POLLING_INTERVAL);
			monitor = yield select(isMonitorEnabledSelector);
		}
	}
}

function* monitorLocalState() {
	// console.log("Local state metrics monitor started");

	// console.log("query", query);
	let monitor: boolean = yield select(isMonitorEnabledSelector);
	// console.log("Local state metrics monitor started", monitor);

	while (monitor) {
		try {
			const query = `SELECT LAST(wsrep_local_state), node, cluster FROM mysql WHERE time > ${moment()
				.subtract(3, "second")
				.valueOf()}000000 GROUP BY "node", "cluster"`;

			const response = yield* call(MetricsApi.fetch, query);
			// console.log(
			// 	"metrics fetch response",
			// 	response,
			// 	response.data.results[0].series
			// );

			const data = response.data.results[0].series;

			const metrics =
				data?.map(
					(res: any, index: number): WsrepLocalStateMetric => ({
						type: METRIC.WSREP_LOCAL_STATE,
						time: new Date(res.values[0][data[index].columns.indexOf("time")]),
						value: res.values[0][1],
						node: res.values[0][data[index].columns.indexOf("node")],
						cluster: res.values[0][data[index].columns.indexOf("cluster")]
					})
				) || [];

			// console.log("local state metrics", metrics);

			yield put(fetchWsrepLocalStateMetricsSuccess(metrics));
		} catch (e) {
			console.error("Local state monitor error:", e);
		} finally {
			yield delay(POLLING_INTERVAL);
			monitor = yield select(isMonitorEnabledSelector);
		}
	}
}

function* MetricsStoreSideEffects() {
	yield takeLatest(METRICS_ACTION.START_MONITOR, monitorLastCommitted);
	yield takeLatest(METRICS_ACTION.START_MONITOR, monitorLocalState);
}

export default MetricsStoreSideEffects;
