Spring Amqp ve RabbitMq

Bir önceki yazıda Amqp’nin ne olduğuna, kısaca geliştirme ortamınız için nasıl RabbitMQ kurulumu yapabileceğinize değinmiştik. Bu yazıda ise bir seviye daha ileriye gidip spring-amqp kullanarak, RabbitMQ ile nasıl haberleşebileceğimize bakalım.

Projemiz basit bir “Hello, World” tipi bir uygulama olucak. Hello, World projesinden tek farkı doğrudan string formatında mesajlar göndermek yerine POJO’lar gönderip alacağız. Zaten sadece String gönderen bir örnek arıyorsanız, Spring-Amqp’nin kendi sitesinde ilgili örneği bulabilirsiniz. Projeyi geliştirirken spring-boot ve spring-amqp kütüphanelerini kullanacağız. Ben geliştirme ortamı olarak Intellij 14 kullanıyorum. Temel sebebi spring-boot projelerini çok rahatlıkla oluşturabilmenizdir. Fakat isterseniz https://start.spring.io adresinden de aynı şekilde bir porje oluşturabilir ve bunu favori geliştirme ortamınıza ekleyebilirsiniz. Build aracı olarak her zamanki gibi maven kullanacağım. Spring boot bizim için herşeyi hallettiğinden, pom.xml ile ya da başka herhangibir xml dosyası ile uğraşmayacağız. Ondan gradle kullanmamanız için de bir sebep yok.

Projenin Yaratılması

Projemizin ne olduğuna kısaca değindiğimize göre şimdi projemizi nasıl yaratılacağına değinebiliriz. Dediğim gibi ben intellij kullanarak anlatacağım ama start.spring.io üzerinden de aynı işlemleri gerçekleyebilirsiniz.

Öncelikle yeni bir proje yaratıyoruz. Proje tipi olarak Spring Initializr seçiyoruz.

Daha sonra gelen ekranda hangi kütüphaneleri kullanacağımızı seçiyoruz. Biz sadece spring-amqp kullanacağımız için, sadece amqp’yi seçiyoruz.

Daha sonra gelen ekranda proje bilgilerini dolduruyoruz. Ben aşağıdaki şekilde doldurdum.

Böylelikle projemizi yaratmış oluyoruz.

Model ve Consumer

Şimdi projemizi yarattığımıza göre sırada mesajlaşmada kullanacağımız modelimizi ve bu modeli tüketecek sınıfımızı geliştirelim.

Model sınıfımız için src/main/java dizinine com.bahadirakin.amqp.message isminde bir paket yaratıyoruz. Daha sonra bu paketin içerisine HelloMessage isminde bir sınıf yaratıp içeriğini aşağıdaki gibi yapıyoruz.

HelloMessage.java

Burada dikkat etmenizi istediğim tek nokta POJO’muzun Serializable olarak işaretlenmiş oluşudur. Daha önceki yazıda belirttiğim gibi AMQP sadece ikilik sistemde mesajlar kabul ediyor. Biz mesajımızı java byte-code’a dönüştürerek göndereceğiz.

Daha sonra yine src/main/java dizinin altına com.bahadirakin.amqp.consumer isminde bir paket yaratıp içerisine HelloMessageConsumer isminde bir sınıf yaratıyoruz. Yarattığımız sınıfın içeriğini aşağıdaki gibi yapıyoruz.

HelloMessageConsumer.java

Burada bir kaç konuya değinmek istiyorum.

  • Öncelikle bu sınıf, uygulamamızı ayarlarken bean olarak tanımlanacak.
  • “onHelloMessage” metod adı bizim için kritik. Çünkü hangi kuyruğu dinleyeceğimizi belirtirken, yeni mesaj geldiğinde hangi metodun tetikleneceğini de belirtmemiz gerekiyor.
  • Kuyruklardaki mesajları dinlemenizin tek yolu bu değil. Eğer isterseniz mesajları kendinizde yönetebilirsiniz. Bunun için ilgili sınıfınızın MessageListener interface’ini kullanması gerekiyor.

Uygulamanın Ayarlanması

Şimdi uygulamamızı nasıl ayarlayacağımıza bakalım. Eğer projenizi benimle aynı değerleri kullanarak oluşturduysanız, src/main/java dizininin altında, com.bahadirakin.amqp paketinin bulunduğunu ve bu paketin içerisinde SpringRabbitmqApplication isminde bir sınıfın bulunduğunu göreceksiniz. Tüm ayarlarımızı annotsayon tabanlı olarak yapacağız ve bu sınıfın içerisine giriyoruz. Önce bu sınıfımızı bir güncelleyelim, daha sonra ise neyin ne analama geldiğine değinelim.

SpringRabbitmqApplication.java

  •  Kendi kuyruğumuzu ve DirectExchange’imizi yaratıyoruz. Daha önceki yazıda bahsetmiştik. DirectExchange’ler point-to-point mimariler geliştirmek için kullanılıyor. Benim bunu tercih etmemin nedeni diğerlerinden daha basit bir şekilde gerçeklenebilmesi. Gördüğünüz gibi exchange ile queue arasındaki ilişki (binding) bir anahtar ile sağlanmış.
  • Yine önceki yazıda, istemcilerin kuyruklardan ve kullanılan mimarilerden haberdar olması gerek miyor. Zaten run methodunda da mesajı bir exchange adına yine bir anahtar kelime ile gönderiyorlar. Bu exchange’in ne tür olduğu ya da arka tarafta kaç farklı kuyruğa iletileceği bilinmiyor.
  • Consumer nesnemiz gördüğünüz gibi bir Bean olarak yaratılmış. Daha sonra bir adapter içerisinde ilgili kuyruğa atanmış. Burada dinleyen taraflar ise hangi kuyruğu dinlediklerini biliyorlar sadece. Mesajın bu kuyruğa nasıl geldiği ile ilgili bir bilgileri yok. Yine gördüğünüz gibi bir adapter kullandığımızdan hangi metodun tetikleneceği metod adı ile belirtiliyor.
  • Kuyruğumuzu gördüğünüz gibi “durable” olarak yarattık. Bunu bir boolean değer ileterek yaptık. “durable”‘ bildiğim kadarıyla JMS mimarilerinde olmayan bir şey. “Durable”ın anlamı, “eğer bu kuyruğu dinleyen hiç bir consumer yoksa kuyruğu sil” şekilde özetlenebilir.
  • run metodunda örnek mesajımızı gönderiyoruz. Daha sonra bu mesajın consumer sınıfına düşmesini bekliyoruz. Mesajımızın içeriği Pojo fakat biz önceki yazımızda AMQP’nin sadece ikilik sistemde konuştuğunu belirtmiştik. Burada, başka bir yöntem belirtmediğimiz için, nesnemiz java byte-code’una dönüştürülüyor. Her iki tarafta java uygulaması olduğundan bir sorun yok. Fakat farklı uygulamalar olsaydı sorun çıkabilirdi. Tabi ki tek yöntem java byte-code’una çevirmek değil. İsterseniz json ve xml’e çevirip bunları byte codu şekilde gönderebilirsiniz. Jackson ve Jaxb için ilgili çeviriciler spring-amqp’de mevcut. Sizin tek yapmanız gereken nesnelerinizi yazarken ilgili annotasyonları eklemek oluyor.

Spring Context’imizi ayarladığımıza göre, son olarak bağlantı bilgilerimizi ayarlayabiliriz. Bunun için application.properties dosyasına aşağıdaki satırları eklemeniz yeterli.

application.properties

Ben önceki yazıda anlattığım, vagrant ile ayağa kaldırdığım, rabbitmq’um erişim adreslerini kullanıyorum.

 Son

Şimdi projemizi çalıştırabiliriz. Bunun için SpringRabbitmqApplication sınıfını komut satırı uygulaması gibi çalıştırmamız yeterli. Komut satırında da beklediğimiz şekilde “Hello from bhdrkn” mesajını görüyoruz. Buradan sonra farklı mesajlaşma mimarilerinin nasıl kurulacağına bakmanızı tavsiye ederim. Aynı şekilde farklı header bilgilerini kullanarak mesajın kuyrukta ne kadar süreyle yaşayacağını ya da mesaj expire olduktan sonra hangi kuyruğa yönlendirilmesi gerektiğini ayarlayabiliyorsunuz. Mimarinizi kurmadan önce bunları da incelemenizi tavsiye ederim.

Projenin tamanına ise aşağıdaki adresten ulaşabilirsiniz.

 

 

  • Mehmet Tayyar Kuyucu

    Selam Bahadır ,

    Öncelikle yazın için teşekkür ederim gayet açıklayıcı. Ancak ben bir durumu daha gerçekleştirmek istiyorum . Listener da hata oluşması durumunda hata oluşan task ın yeniden kuyruğa atılmasını ve belirli bir hata sayısına ulaştığında da kuyruktan silinmesini istiyorum. Bu mümkün mü ?