Load test for XMLRPC API of DevPlace.NET
📝 PythonCode for concurrent execution of xmlrpc client to test the DevPlace.NET XMLRPC API.
Python
"""
XML-RPC Load Test for DevPlace.net
Tests concurrent reads and writes via XML-RPC protocol.
Adjust the API key if expired.
"""
import xmlrpc.client
import concurrent.futures
import time
import sys
import json
API_KEY = "<Your API key, it is inyour profile page>"
URL = "https://devplace.net/xmlrpc"
NUM_REQUESTS = 200
CONCURRENCY = 20 # parallel workers
proxy = xmlrpc.client.ServerProxy(URL, allow_none=True, use_builtin_types=True)
def do_batch(batch_id):
"""Run one full cycle: read feed, then write a post."""
results = {"batch": batch_id, "read_ok": 0, "read_fail": 0, "write_ok": 0, "write_fail": 0, "authors_seen": set()}
try:
feed = proxy.feed.list({})
for item in feed.get("posts", []):
author = item.get("author", {}).get("username", "?")
title = item.get("post", {}).get("title", "")
results["authors_seen"].add(author)
results["read_ok"] += 1
except Exception as e:
results["read_fail"] += 1
results["error_read"] = str(e)
try:
result = proxy.posts.create({
"content": f"Hello from XML-RPC load test batch {batch_id}",
"api_key": API_KEY
})
if result.get("ok"):
results["write_ok"] += 1
results["redirect"] = result.get("redirect", "")
else:
results["write_fail"] += 1
results["write_error"] = str(result)
except Exception as e:
results["write_fail"] += 1
results["error_write"] = str(e)
return results
def run_load_test():
total_start = time.time()
all_results = []
success_writes = 0
fail_writes = 0
reads_total = 0
read_errors = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
futures = {executor.submit(do_batch, i): i for i in range(NUM_REQUESTS)}
done = 0
for future in concurrent.futures.as_completed(futures):
done += 1
try:
r = future.result()
all_results.append(r)
reads_total += r["read_ok"]
read_errors += r["read_fail"]
success_writes += r["write_ok"]
fail_writes += r["write_fail"]
except Exception as e:
fail_writes += 1
all_results.append({"batch": futures[future], "error": str(e)})
if done % 100 == 0 or done == NUM_REQUESTS:
elapsed = time.time() - total_start
rate = done / elapsed if elapsed > 0 else 0
sys.stderr.write(f"\rProgress: {done}/{NUM_REQUESTS} | "
f"Writes: {success_writes} OK / {fail_writes} FAIL | "
f"Reads: {reads_total} / {read_errors} err | "
f"Rate: {rate:.1f} req/s | Elapsed: {elapsed:.1f}s")
sys.stderr.flush()
total_elapsed = time.time() - total_start
print(f"\n\n=== LOAD TEST COMPLETE ===")
print(f"Total time: {total_elapsed:.2f}s")
print(f"Total requests (read+write cycles): {NUM_REQUESTS}")
print(f"Successful writes: {success_writes}")
print(f"Failed writes: {fail_writes}")
print(f"Total feed items read: {reads_total}")
print(f"Read errors: {read_errors}")
print(f"Average throughput: {NUM_REQUESTS / total_elapsed:.1f} cycles/s")
print(f"Write success rate: {success_writes / NUM_REQUESTS * 100:.1f}%")
print(f"Read success rate: {(reads_total) / (reads_total + read_errors) * 100 if (reads_total + read_errors) > 0 else 0:.1f}%")
return {
"elapsed_s": total_elapsed,
"total_cycles": NUM_REQUESTS,
"writes_ok": success_writes,
"writes_fail": fail_writes,
"reads_total": reads_total,
"read_errors": read_errors,
"throughput": NUM_REQUESTS / total_elapsed,
"write_success_pct": round(success_writes / NUM_REQUESTS * 100, 1),
}
if __name__ == "__main__":
stats = run_load_test()
print(json.dumps(stats, indent=2))
Comments
No comments yet. Start the discussion.