| SERVER (Sender):
|
| import socket
|
| import pickle
|
| import random
|
|
|
| def miller_rabin_test(n, k=5):
|
| if n == 2 or n == 3:
|
| return True
|
| if n < 2 or n % 2 == 0:
|
| return False
|
| r, d = 0, n - 1
|
| while d % 2 == 0:
|
| r += 1
|
| d //= 2
|
| for _ in range(k):
|
| a = random.randrange(2, n - 1)
|
| x = pow(a, d, n)
|
| if x == 1 or x == n - 1:
|
| continue
|
| for _ in range(r - 1):
|
| x = (x * x) % n
|
| if x == n - 1:
|
| break
|
| else:
|
| return False
|
| return True
|
| def generate_prime(bits):
|
| while True:
|
| n = random.getrandbits(bits)
|
| n |= (1 << bits - 1) | 1
|
| if miller_rabin_test(n):
|
| return n
|
| def mod_inverse(a, m):
|
| def extended_gcd(a, b):
|
| if a == 0:
|
| return b, 0, 1
|
| gcd, x1, y1 = extended_gcd(b % a, a)
|
| x = y1 - (b // a) * x1
|
| y = x1
|
| return gcd, x, y
|
| _, x, _ = extended_gcd(a, m)
|
| return (x % m + m) % m
|
|
|
| def generate_dsa_parameters():
|
| q = generate_prime(160)
|
| while True:
|
| k = random.getrandbits(864)
|
| p = k * q + 1
|
| if p.bit_length() == 1024 and miller_rabin_test(p):
|
| break
|
| while True:
|
| h = random.randrange(2, p - 1)
|
| g = pow(h, (p - 1) // q, p)
|
| if g > 1:
|
| break
|
| x = random.randrange(1, q)
|
| y = pow(g, x, p)
|
| return p, q, g, x, y
|
| def hash_message(message):
|
| h = 0
|
| for char in message:
|
| h = (h * 31 + ord(char)) & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
|
| return h
|
| def sign_message(message, p, q, g, x):
|
| h = hash_message(message)
|
| while True:
|
| k = random.randrange(1, q)
|
| r = pow(g, k, p) % q
|
| if r == 0:
|
| continue
|
| k_inv = mod_inverse(k, q)
|
| s = (k_inv * (h + x * r)) % q
|
| if s != 0:
|
| break
|
| return r, s, k
|
| def server_program():
|
| host = '127.0.0.1'
|
| port = 5000
|
| try:
|
| server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
| server_socket.bind((host, port))
|
| server_socket.listen(1)
|
| p, q, g, x, y = generate_dsa_parameters()
|
| while True:
|
| conn, addr = server_socket.accept()
|
| print("\nTest Case 1: Valid Signature")
|
| message1 = "This is a legitimate message"
|
| r1, s1, k1 = sign_message(message1, p, q, g, x)
|
| h1 = hash_message(message1)
|
| print(f"p: {p}")
|
| print(f"q: {q}")
|
| print(f"g: {g}")
|
| print(f"Message: {message1}")
|
| print(f"Hash value: {h1}")
|
| print(f"Private key (x): {x}")
|
| print(f"Public key (y): {y}")
|
| print(f"k: {k1}")
|
| print(f"(r,s) pair: ({r1}, {s1})")
|
| data1 = {
|
| 'message': message1,
|
| 'signature': (r1, s1),
|
| 'public_params': (p, q, g, y),
|
| 'test_case': 1
|
| }
|
| conn.send(pickle.dumps(data1))
|
| print("\nTest Case 2: Invalid Signature")
|
| message2 = "This message will be tampered"
|
| p_fake, q_fake, g_fake, x_fake, y_fake = generate_dsa_parameters()
|
| r2, s2, k2 = sign_message(message2, p_fake, q_fake, g_fake, x_fake)
|
| h2 = hash_message(message2)
|
| print(f"p: {p}")
|
| print(f"q: {q}")
|
| print(f"g: {g}")
|
| print(f"Message: {message2}")
|
| print(f"Hash value: {h2}")
|
| print(f"Private key (x): {x_fake}")
|
| print(f"Public key (y): {y}")
|
| print(f"k: {k2}")
|
| print(f"(r,s) pair: ({r2}, {s2})")
|
| data2 = {
|
| 'message': message2,
|
| 'signature': (r2, s2),
|
| 'public_params': (p, q, g, y),
|
| 'test_case': 2
|
| }
|
| conn.send(pickle.dumps(data2))
|
| conn.close()
|
| except Exception as e:
|
| print(f"Server error: {e}")
|
| finally:
|
| server_socket.close()
|
|
|
| if __name__ == '__main__':
|
| server_program()
|
|
|
|
|
| CLIENT (Receiver):
|
| import socket
|
| import pickle
|
|
|
| def mod_inverse(a, m):
|
| def extended_gcd(a, b):
|
| if a == 0:
|
| return b, 0, 1
|
| gcd, x1, y1 = extended_gcd(b % a, a)
|
| x = y1 - (b // a) * x1
|
| y = x1
|
| return gcd, x, y
|
| _, x, _ = extended_gcd(a, m)
|
| return (x % m + m) % m
|
| def hash_message(message):
|
| h = 0
|
| for char in message:
|
| h = (h * 31 + ord(char)) & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
|
| return h
|
| def verify_signature(message, signature, p, q, g, y):
|
| r, s = signature
|
| if not (0 < r < q and 0 < s < q):
|
| return False, None, None, None, None
|
| h = hash_message(message)
|
| w = mod_inverse(s, q)
|
| u1 = (h * w) % q
|
| u2 = (r * w) % q
|
| v = (pow(g, u1, p) * pow(y, u2, p) % p) % q
|
| return v == r, w, u1, u2, v
|
| def client_program():
|
| host = '127.0.0.1'
|
| port = 5000
|
| try:
|
| client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
| client_socket.connect((host, port))
|
| for test_case in [1, 2]:
|
| data = pickle.loads(client_socket.recv(4096))
|
| message = data['message']
|
| signature = data['signature']
|
| p, q, g, y = data['public_params']
|
| r, s = signature
|
| is_valid, w, u1, u2, v = verify_signature(message, signature, p, q, g, y)
|
| print(f"\nTest Case {test_case}:")
|
| print(f"Message: {message}")
|
| print(f"w: {w}")
|
| print(f"u1: {u1}")
|
| print(f"u2: {u2}")
|
| print(f"v: {v}")
|
| print(f"Compare r with v: r = {r}, v = {v}, {'Equal' if r == v else 'Not Equal'}")
|
| if is_valid:
|
| print(f"Test Case {test_case}: Signature is VALID")
|
| else:
|
| print(f"Test Case {test_case}: Signature is INVALID")
|
| except Exception as e:
|
| print(f"Client error: {e}")
|
| finally:
|
| client_socket.close()
|
|
|
| if __name__ == '__main__':
|
| client_program()
|