问题描述
我知道这个问题与问题重复使用rabbitmq发送消息不是字符串而是结构一个>
I Understand that this question duplicates question at using rabbitmq to send a message not string but struct
如果使用第一种方法来做到这一点
if to do this using the first way
第一种方式
我有以下痕迹:
java.io.EOFException
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2304)
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:798)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:298)
at com.mdnaRabbit.worker.data.Data.fromBytes(Data.java:78)
at com.mdnaRabbit.worker.App.main(App.java:41)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
我已经检查并确认该消息在发送者类中绝对可以很好地转换为字节,但消费者无法接收它.
I've checked and shure that message is transformd to bytes absolutely well in sender class, but the consumer can't receive it.
这是我的制作人课程:
package com.mdnaRabbit.newt;
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import org.apache.commons.lang.SerializationUtils;
import com.mdnaRabbit.worker.data.Data;
public class App {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main( String[] argv) throws IOException{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
int i = 0;
do {
Data message = getMessage();
byte [] byteMessage = message.getBytes();
//System.out.println(byteMessage);
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, byteMessage);
System.out.println(" [" + (i+1) + "] message Sent" + Data.fromBytes(byteMessage).getBody());
i++;
} while (i<15);
channel.close();
connection.close();
}
private static Data getMessage(){
Data data = new Data();
data.setHeader("header");
data.setDomainId("abc.com");
data.setReceiver("me");
data.setSender("he");
data.setBody("body");
return data;
}
private static String joinStrings(String[] strings, String delimiter){
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++){
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
这是我的消费类:
package com.mdnaRabbit.worker;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.mdnaRabbit.worker.data.Data;
import org.apache.commons.lang.SerializationUtils;
public class App {
private static final String TASK_QUEUE_NAME = "task_queue";
private static int i = 0;
public static void main( String[] argv )
throws IOException,
InterruptedException{
ExecutorService threader = Executors.newFixedThreadPool(20);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection(threader);
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(20);
final QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
try {
while (true) {
try {QueueingConsumer.Delivery delivery = consumer.nextDelivery();
Data message = Data.fromBytes(delivery.getBody());
//Data message = (Data) SerializationUtils.deserialize(delivery.getBody());
System.out.println(" [" + (i++) +"] Received" + message.getBody());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}catch (Exception e){
}
}
} catch (Exception e){
e.printStackTrace();
}
channel.close();
connection.close();
}
}
这是我的数据类:
package com.mdnaRabbit.worker.data;
import java.io.*;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Data implements Serializable{
public String header;
public String body;
public String domainId;
public String sender;
public String receiver;
public void setHeader(String head){
this.header = head;
}
public String getHeader(){
return header;
}
public void setBody(String body){
this.body = body;
}
public String getBody(){
return body;
}
public void setDomainId(String domainId){
this.domainId = domainId;
}
public String getDomainId(){
return domainId;
}
public void setSender(String sender){
this.sender = sender;
}
public String getSender(){
return sender;
}
public String getReceiver(){
return receiver;
}
public void setReceiver(String receiver){
this.receiver = receiver;
}
public byte[] getBytes() {
byte[]bytes;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try{
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(this);
oos.flush();
oos.reset();
bytes = baos.toByteArray();
oos.close();
baos.close();
} catch(IOException e){
bytes = new byte[] {};
Logger.getLogger("bsdlog").log(Level.ALL, "unable to write to output stream" + e);
}
return bytes;
}
public static Data fromBytes(byte[] body) {
Data obj = null;
try {
ByteArrayInputStream bis = new ByteArrayInputStream(body);
ObjectInputStream ois = new ObjectInputStream(bis);
obj = (Data) ois.readObject();
ois.close();
bis.close();
}
catch (IOException e) {
e.printStackTrace();
}
catch (ClassNotFoundException ex) {
ex.printStackTrace();
}
return obj;
}
}
我似乎总是消费者收到消息,因为当我不尝试将其转换为对象时,只是写System.out.println(delivery.getBody)
它显示字节
I always seems that consumer receives messages, because when I'm not trying to transform it into the object and just write
System.out.println(delivery.getBody)
it shows bytes
推荐答案
你收到的字节数组好像是空的.这是因为:
It looks like the byte array you receive is empty. This happens because of this:
} catch(IOException e){
bytes = new byte[] {};
}
产生异常时,代码不会警告您某些内容已损坏,而是发送一个空数组.您至少应该记录错误.
When an exception is produced, the code doesn't warn you that something is broken and just sends an empty array instead. You should at least log the error.
产生异常可能是因为您试图序列化一个不可序列化的类.要使类可序列化,您必须在其声明中添加implements Serializable":
The exception is being produced probably because you are trying to serialize a class that is not serializable. To make a class serializable you have to add "implements Serializable" to its declaration:
public class Data implements Serializable {
这篇关于使用 RabbitMQ 发送对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!