[quote][pid=826683520,44291545,1]Reply[/pid] Post by [uid=41976057]幸运D的yby[/uid] (2025-06-08 10:50):
难怪我感觉他有些帖子的内容不像是不会编程能做出来的[/quote]弱智就是弱智,连会编程是什么意思都不知。
你不会以为,敲几句代码,本地部署一下,装点插件,装点依赖,编译一下,做点一键包,就叫会编程吧?
我做这些有手就会的基础内容教程,错在没收你们钱?
会编程,意味着起码得从零开始写代码,会优化代码,那就优化下代码提高正确率?
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import csv
import argparse
from datetime import datetime
import re
import shutil
import hashlib
import platform
#
# Configuration
#DEFAULT_INPUT_DIR = "input_images"
DEFAULT_OUTPUT_DIR = "detection_results"
REPORT_FILE = "watermark_report.csv"
DEBUG_MODE = False
SAVE_RAW_SPECTRUM = True
WATERMARK_DETECTION = True
WATERMARK_THRESHOLD = 0.85
AUTO_RENAME_FILES = True
# Watermark detection parameters
CROSS_INTENSITY_THRESH = 0.15
RADIAL_VARIANCE_THRESH = 0.2
CENTER_INTENSITY_THRESH = 0.3
def sanitize_filename(filename):
"""Convert filenames to safe names"""
try:
with open(filename, 'rb') as f:
file_hash = hashlib.md5(f.read()).hexdigest()[:8]
_, ext = os.path.splitext(filename)
base_name = os.path.basename(filename)
safe_name = re.sub(r'[^\w\s-]', '', base_name)
safe_name = re.sub(r'\s+', '_', safe_name)
if len(safe_name) > 50:
safe_name = safe_name[:20] + "..." + safe_name[-20:]
return f"{file_hash}_{safe_name}{ext}"
except Exception:
return f"file_{datetime.now().strftime('%H%M%S%f')}{ext}"
def copy_and_rename_files(input_dir, temp_dir):
"""Copy and rename files to temp directory"""
os.makedirs(temp_dir, exist_ok=True)
file_mapping = {}
image_extensions = ['.png', '.jpg', '.jpeg', '.bmp', '.tiff', '.webp']
for filename in os.listdir(input_dir):
src_path = os.path.join(input_dir, filename)
if os.path.isfile(src_path) and any(filename.lower().endswith(ext) for ext in image_extensions):
safe_name = sanitize_filename(src_path)
dest_path = os.path.join(temp_dir, safe_name)
shutil.copy2(src_path, dest_path)
file_mapping[src_path] = (dest_path, filename)
return file_mapping
def detect_watermark(spectrum):
"""Detect watermark features"""
try:
height, width = spectrum.shape
center_y, center_x = height // 2, width // 2
# Feature 1: Center intensity
center_value = spectrum[center_y, center_x]
max_value = np.max(spectrum)
center_intensity = center_value / max_value if max_value > 0 else 0
# Feature 2: Cross detection
horizontal_line = spectrum[center_y, max(0, center_x-10):min(width, center_x+11)]
vertical_line = spectrum[max(0, center_y-10):min(height, center_y+11), center_x]
h_line_mean = np.mean(horizontal_line) / max_value
v_line_mean = np.mean(vertical_line) / max_value
cross_intensity = (h_line_mean + v_line_mean) / 2
# Feature 3: Radial symmetry
y, x = np.indices(spectrum.shape)
r = np.sqrt((x - center_x)**2 + (y - center_y)**2)
mask = r > 10
angular_variance = np.var(spectrum[mask])
# Feature 4: Symmetry
quadrant_size = min(center_y, center_x) // 2
q1 = spectrum[center_y-quadrant_size:center_y, center_x:center_x+quadrant_size]
q3 = spectrum[center_y:center_y+quadrant_size, center_x-quadrant_size:center_x]
symmetry_diff = np.mean(np.abs(q1 - np.rot90(q3, 2)))
# Feature scoring
features = {
'center_intensity': center_intensity,
'cross_intensity': cross_intensity,
'angular_variance': angular_variance,
'symmetry_diff': symmetry_diff
}
# Confidence calculation
confidence = (
0.4 * min(center_intensity / CENTER_INTENSITY_THRESH, 1.0) +
0.4 * min(cross_intensity / CROSS_INTENSITY_THRESH, 1.0) +
0.1 * (1 - min(angular_variance / RADIAL_VARIANCE_THRESH, 1.0)) +
0.1 * (1 - min(symmetry_diff / 0.1, 1.0))
)
confidence = min(max(confidence, 0.0), 1.0)
is_watermark = confidence > WATERMARK_THRESHOLD
return is_watermark, confidence, features
except Exception:
return False, 0.0, {}
def process_image(image_path, original_name, output_dir, report_writer=None):
"""Process single image"""
try:
# Read image
try:
img = cv2.imdecode(np.fromfile(image_path, dtype=np.uint8), cv2.IMREAD_GRAYSCALE)
except:
with open(image_path, 'rb') as f:
img_array = np.frombuffer(f.read(), dtype=np.uint8)
img = cv2.imdecode(img_array, cv2.IMREAD_GRAYSCALE)
if img is None:
return None, "Read failed", False, 0.0
filename = os.path.basename(original_name)
name, ext = os.path.splitext(filename)
# Fourier transform
f = np.fft.fft2(img)
fshift = np.fft.fftshift(f)
magnitude_spectrum = np.log(1 + np.abs(fshift))
magnitude_spectrum = cv2.normalize(magnitude_spectrum, None, 0, 255, cv2.NORM_MINMAX)
normalized_spectrum = magnitude_spectrum / 255.0
# Watermark detection
is_watermark = False
confidence = 0.0
features = {}
if WATERMARK_DETECTION:
is_watermark, confidence, features = detect_watermark(normalized_spectrum)
watermark_status = "Detected" if is_watermark else "Not detected"
# Save spectrum visualization
plt.figure(figsize=(14, 7))
plt.subplot(131), plt.imshow(img, cmap='gray'), plt.title('Original Image'), plt.axis('off')
plt.subplot(132), plt.imshow(magnitude_spectrum, cmap='gray'), plt.title('Spectrum'), plt.axis('off')
plt.subplot(133), plt.imshow(magnitude_spectrum, cmap='jet'), plt.title('Enhanced'), plt.axis('off')
if WATERMARK_DETECTION:
plt.figtext(0.5, 0.01, f"Watermark: {watermark_status} | Confidence: {confidence:.2f}",
ha="center", fontsize=12,
color="red" if confidence > WATERMARK_THRESHOLD else "green",
bbox={"facecolor":"yellow", "alpha":0.3, "pad":5})
output_img_path = os.path.join(output_dir, f"{name}_spectrum{ext}")
plt.savefig(output_img_path, bbox_inches='tight', dpi=150)
plt.close()
# Save raw data if enabled
if SAVE_RAW_SPECTRUM:
raw_data_path = os.path.join(output_dir, f"{name}_spectrum.npy")
np.save(raw_data_path, magnitude_spectrum)
# Write to report
if report_writer:
img_size = f"{img.shape[1]}x{img.shape[0]}"
report_data = [
filename, img_size, watermark_status, f"{confidence:.4f}",
f"{features.get('center_intensity', 0):.4f}",
f"{features.get('cross_intensity', 0):.4f}",
f"{features.get('angular_variance', 0):.4f}",
f"{features.get('symmetry_diff', 0):.4f}",
datetime.now().strftime("%Y-%m-%d %H:%M:%S")
]
report_writer.writerow(report_data)
return output_img_path, "Success", is_watermark, confidence
except Exception as e:
return None, f"Error: {str(e)}", False, 0.0
def batch_process_images(input_dir, output_dir):
"""Batch process images"""
os.makedirs(output_dir, exist_ok=True)
watermarked_dir = os.path.join(output_dir, "watermarked")
clean_dir = os.path.join(output_dir, "clean")
os.makedirs(watermarked_dir, exist_ok=True)
os.makedirs(clean_dir, exist_ok=True)
temp_dir = os.path.join(output_dir, "temp")
os.makedirs(temp_dir, exist_ok=True)
file_mapping = copy_and_rename_files(input_dir, temp_dir)
if not file_mapping:
print("No images found!")
shutil.rmtree(temp_dir, ignore_errors=True)
return 0, []
report_path = os.path.join(output_dir, REPORT_FILE)
with open(report_path, 'w', newline='', encoding='utf-8') as report_file:
fieldnames = ['Filename', 'Size', 'Watermark', 'Confidence',
'Center', 'Cross', 'Radial', 'Symmetry', 'Time']
writer = csv.DictWriter(report_file, fieldnames=fieldnames)
writer.writeheader()
csv_writer = csv.writer(report_file)
success_count = 0
results = []
watermark_count = 0
for original_path, (temp_path, original_name) in tqdm(file_mapping.items(), desc="Processing"):
result_path, status, is_watermark, confidence = process_image(
temp_path, original_name, output_dir, report_writer=csv_writer)
if status == "Success":
success_count += 1
target_dir = watermarked_dir if is_watermark else clean_dir
target_path = os.path.join(target_dir, original_name)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
try:
shutil.copy2(temp_path, target_path)
except:
try:
shutil.copy2(original_path, target_path)
except Exception as e:
status = f"Copy failed: {e}"
if is_watermark:
watermark_count += 1
results.append((original_name, status, result_path, is_watermark, confidence))
else:
results.append((original_name, status, "", False, 0.0))
shutil.rmtree(temp_dir, ignore_errors=True)
# Generate summary
with open(os.path.join(output_dir, "summary.txt"), 'w', encoding='utf-8') as f:
summary = f"""Implicit Watermark Detection ReportProject: Implicit_watermark_detection
Analysis Time: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
Total Images: {len(file_mapping)}
Watermarked Images: {watermark_count}
Clean Images: {success_count - watermark_count}
Watermark Confidence: Avg {confidence:.2f}Detailed Report: {report_path}
Watermarked Folder: {watermarked_dir}
Clean Folder: {clean_dir}"""
f.write(summary)
return success_count, results
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Implicit Watermark Detection Tool')
parser.add_argument('--input', default=DEFAULT_INPUT_DIR, help='Input image directory')
parser.add_argument('--output', default=DEFAULT_OUTPUT_DIR, help='Output directory')
args = parser.parse_args()
print("="*50)
print(f"Project: Implicit_watermark_detection")
print(f"OS: {platform.system()} {platform.release()}")
print(f"Python: {platform.python_version()}")
print(f"Processing: {args.input} -> {args.output}")
print("="*50)
success_count, results = batch_process_images(args.input, args.output)
print("\nProcessing complete!")
print(f"Successfully processed {success_count} images")
if WATERMARK_DETECTION and success_count > 0:
watermark_samples = [r for r in results if r[3]]
if watermark_samples:
print(f"\nDetected {len(watermark_samples)} watermarked images")
print("Top 5 samples:")
for sample in watermark_samples[:5]:
print(f" - {sample[0]} (Confidence: {sample[4]:.2f})")