From 06f79923bbbf5330210140212405a8805e4d1776 Mon Sep 17 00:00:00 2001 From: kacperlo Date: Tue, 10 Jun 2025 15:49:31 +0200 Subject: [PATCH] Fix anomaly detectors --- .../com/anomaly/detector/AnomalyDetector.java | 216 ++++++++++++------ 1 file changed, 148 insertions(+), 68 deletions(-) diff --git a/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java index 04058140..c7128331 100644 --- a/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java +++ b/anomaly-detector/src/main/java/com/anomaly/detector/AnomalyDetector.java @@ -21,10 +21,16 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; import java.util.*; import java.time.Instant; import java.io.IOException; +import java.io.Serializable; public class AnomalyDetector { @@ -81,13 +87,13 @@ public class AnomalyDetector { // 1. Amount anomaly - sudden high-value transactions DataStream amountAlerts = transactionStream .keyBy(Transaction::getCardId) - .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(1))) + .window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1))) .process(new AmountAnomalyDetector()); // 2. Location anomaly - sudden change in location DataStream locationAlerts = transactionStream .keyBy(Transaction::getCardId) - .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(1))) + .window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1))) .process(new LocationAnomalyDetector()); // 3. Frequency anomaly - unusual number of transactions in short time @@ -120,28 +126,28 @@ public class AnomalyDetector { } // Detector for unusual transaction amounts - public static class AmountAnomalyDetector + public static class AmountAnomalyDetector extends ProcessWindowFunction { - + @Override - public void process(String cardId, Context context, Iterable transactions, + public void process(String cardId, Context context, Iterable transactions, Collector out) { List transactionList = new ArrayList<>(); transactions.forEach(transactionList::add); - + if (transactionList.isEmpty()) return; - + // Calculate statistics double averageAmount = transactionList.stream() .mapToDouble(Transaction::getAmount) .average() .orElse(0); - + double stdDeviation = calculateStdDeviation(transactionList, averageAmount); - - // Check for anomalies (transactions that are more than 3 standard deviations from mean) + + // Check for anomalies (transactions that are more than 1.7 standard deviations from mean) for (Transaction transaction : transactionList) { - if (stdDeviation > 0 && Math.abs(transaction.getAmount() - averageAmount) > 3 * stdDeviation) { + if (stdDeviation > 0 && Math.abs(transaction.getAmount() - averageAmount) > 2 * stdDeviation && transaction.getAmount() > averageAmount && transaction.getAmount() > 1000) { out.collect(new TransactionAlert( "AMOUNT_ANOMALY", transaction.getCardId(), @@ -150,13 +156,13 @@ public class AnomalyDetector { transaction.getLatitude(), transaction.getLongitude(), transaction.getTimestamp(), - "Unusual transaction amount detected: $" + transaction.getAmount() + + "Unusual transaction amount detected: $" + transaction.getAmount() + " (Average: $" + String.format("%.2f", averageAmount) + ")" )); } } } - + private double calculateStdDeviation(List transactions, double mean) { return Math.sqrt(transactions.stream() .mapToDouble(t -> Math.pow(t.getAmount() - mean, 2)) @@ -166,87 +172,153 @@ public class AnomalyDetector { } // Detector for unusual transaction locations - public static class LocationAnomalyDetector + public static class LocationAnomalyDetector extends ProcessWindowFunction { - - // Map to store frequent locations for each card - private final Map> cardLocations = new HashMap<>(); - + + private transient MapState> knownLocations; + private static final int MAX_KNOWN_LOCATIONS = 5; // Limit known locations to avoid memory issues + private static final double ANOMALY_DISTANCE_THRESHOLD = 50.0; // Threshold in km + private static final int MIN_LOCATIONS_FOR_DETECTION = 3; // Minimum known locations before detecting anomalies + @Override - public void process(String cardId, Context context, Iterable transactions, - Collector out) { + public void open(Configuration parameters) throws Exception { + MapStateDescriptor> descriptor = + new MapStateDescriptor<>( + "knownLocations", + TypeInformation.of(String.class), + TypeInformation.of(new TypeHint>() {}) + ); + knownLocations = getRuntimeContext().getMapState(descriptor); + } + + @Override + public void process(String cardId, Context context, Iterable transactions, + Collector out) throws Exception { List transactionList = new ArrayList<>(); transactions.forEach(transactionList::add); - + if (transactionList.isEmpty()) return; - + // Get or create location set for this card - Set frequentLocations = cardLocations.computeIfAbsent(cardId, k -> new HashSet<>()); - + Set cardKnownLocations; + if (knownLocations.contains(cardId)) { + cardKnownLocations = knownLocations.get(cardId); + System.out.println("Card " + cardId + " has " + cardKnownLocations.size() + " known locations"); + } else { + cardKnownLocations = new HashSet<>(); + System.out.println("New card detected: " + cardId + ", initializing known locations"); + } + // Process each transaction for (Transaction transaction : transactionList) { LocationPoint currentPoint = new LocationPoint(transaction.getLatitude(), transaction.getLongitude()); - - // If we have at least 3 frequent locations for this card - if (frequentLocations.size() >= 3) { - boolean isNearKnownLocation = false; - - // Check if current location is near any known frequent location - for (LocationPoint knownPoint : frequentLocations) { - if (calculateDistance(currentPoint, knownPoint) < 50) { // Less than 50km - isNearKnownLocation = true; + + // First few transactions establish the baseline locations + if (cardKnownLocations.size() < MIN_LOCATIONS_FOR_DETECTION) { + System.out.println("Building baseline for card " + cardId + ", adding location #" + + (cardKnownLocations.size() + 1) + " to known locations"); + + // Check if this location is already very close to a known location before adding + boolean isVeryCloseToKnown = false; + for (LocationPoint knownPoint : cardKnownLocations) { + if (calculateDistance(currentPoint, knownPoint) < 2.0) { // Within 2km = same area + isVeryCloseToKnown = true; + System.out.println("Location is very close to existing baseline location, not adding duplicate"); break; } } - - // If not near any known location, it might be an anomaly - if (!isNearKnownLocation) { - out.collect(new TransactionAlert( - "LOCATION_ANOMALY", - transaction.getCardId(), - transaction.getUserId(), - transaction.getAmount(), - transaction.getLatitude(), - transaction.getLongitude(), - transaction.getTimestamp(), - "Unusual transaction location detected at: " + - transaction.getLatitude() + ", " + transaction.getLongitude() - )); + + // Only add distinct baseline locations + if (!isVeryCloseToKnown) { + cardKnownLocations.add(currentPoint); + } + + // We're still building the baseline, don't check for anomalies yet + continue; + } + + // Check distance to known locations + double closestDistance = Double.MAX_VALUE; + LocationPoint closestPoint = null; + + for (LocationPoint knownPoint : cardKnownLocations) { + double distance = calculateDistance(currentPoint, knownPoint); + if (distance < closestDistance) { + closestDistance = distance; + closestPoint = knownPoint; } } - - // Add current location to frequent locations (max 10 locations per card) - if (frequentLocations.size() < 10) { - frequentLocations.add(currentPoint); + + System.out.println("CARD " + cardId + ": Transaction at " + currentPoint + ", closest known location: " + + closestPoint + " (" + String.format("%.2f", closestDistance) + " km)"); + + // Detect anomaly if transaction is far from all known locations + if (closestDistance > ANOMALY_DISTANCE_THRESHOLD) { + System.out.println("⚠️ LOCATION ANOMALY DETECTED: Distance " + + String.format("%.2f", closestDistance) + "km exceeds threshold of " + + ANOMALY_DISTANCE_THRESHOLD + "km"); + + out.collect(new TransactionAlert( + "LOCATION_ANOMALY", + transaction.getCardId(), + transaction.getUserId(), + transaction.getAmount(), + transaction.getLatitude(), + transaction.getLongitude(), + transaction.getTimestamp(), + "Unusual transaction location: " + String.format("%.2f", closestDistance) + + "km from nearest known location" + )); + + // Don't automatically add anomalous locations to known locations + } else { + // Check if this location is already very close to a known location + boolean isVeryCloseToKnown = false; + for (LocationPoint knownPoint : cardKnownLocations) { + if (calculateDistance(currentPoint, knownPoint) < 2.0) { // Within 2km = same area + isVeryCloseToKnown = true; + break; + } + } + + // Only add distinct new locations, up to our maximum + if (!isVeryCloseToKnown && cardKnownLocations.size() < MAX_KNOWN_LOCATIONS) { + cardKnownLocations.add(currentPoint); + System.out.println("Added new location to known locations: " + currentPoint); + } } } + + // Update the state + knownLocations.put(cardId, cardKnownLocations); } - + // Calculate distance between two points using Haversine formula (in km) private double calculateDistance(LocationPoint p1, LocationPoint p2) { final int R = 6371; // Earth radius in km - + double latDistance = Math.toRadians(p2.latitude - p1.latitude); double lonDistance = Math.toRadians(p2.longitude - p1.longitude); - + double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2) + Math.cos(Math.toRadians(p1.latitude)) * Math.cos(Math.toRadians(p2.latitude)) * Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2); - + double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)); - + return R * c; } - - private static class LocationPoint { + + private static class LocationPoint implements Serializable { + private static final long serialVersionUID = 1L; private final double latitude; private final double longitude; - + public LocationPoint(double latitude, double longitude) { this.latitude = latitude; this.longitude = longitude; } - + @Override public boolean equals(Object o) { if (this == o) return true; @@ -255,35 +327,43 @@ public class AnomalyDetector { return Double.compare(that.latitude, latitude) == 0 && Double.compare(that.longitude, longitude) == 0; } - + @Override public int hashCode() { return Objects.hash(latitude, longitude); } + + @Override + public String toString() { + return "LocationPoint{" + + "lat=" + latitude + + ", lon=" + longitude + + '}'; + } } } // Detector for unusual transaction frequency - public static class FrequencyAnomalyDetector + public static class FrequencyAnomalyDetector extends ProcessWindowFunction { - + @Override - public void process(String cardId, Context context, Iterable transactions, + public void process(String cardId, Context context, Iterable transactions, Collector out) { List transactionList = new ArrayList<>(); transactions.forEach(transactionList::add); - + // Get window info long windowStart = context.window().getStart(); long windowEnd = context.window().getEnd(); long windowSizeMinutes = (windowEnd - windowStart) / (1000 * 60); - + // If there are more than 5 transactions in 5 minutes for the same card, flag it if (transactionList.size() > 5) { Transaction latestTransaction = transactionList.stream() .max(Comparator.comparing(Transaction::getTimestamp)) .orElse(transactionList.get(0)); - + out.collect(new TransactionAlert( "FREQUENCY_ANOMALY", latestTransaction.getCardId(), @@ -292,7 +372,7 @@ public class AnomalyDetector { latestTransaction.getLatitude(), latestTransaction.getLongitude(), latestTransaction.getTimestamp(), - "Unusual transaction frequency detected: " + transactionList.size() + + "Unusual transaction frequency detected: " + transactionList.size() + " transactions in " + windowSizeMinutes + " minutes" )); }